In [None]:
import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# --------------------
# Config
# --------------------
CSV_FILE = "PPG.csv"
COL_RED = "RED_ADC"
COL_IR = "IR_ADC"
FS = 100                 # sampling frequency (Hz) — change if known otherwise
N = 10_000               # limit rows
INVERT_AC = True         # invert AC so pulses point up
BAND_LOW = 0.5           # Hz
BAND_HIGH = 5.0          # Hz

# --------------------
# Load first N rows
# --------------------
df = pd.read_csv(CSV_FILE, nrows=N)

# Ensure required columns exist
missing = [c for c in (COL_RED, COL_IR) if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns in CSV: {missing}")

# (Optional) drop rows with NA in either channel
df = df[[COL_RED, COL_IR]].dropna()

red = df[COL_RED].to_numpy()
ir  = df[COL_IR].to_numpy()
t = np.arange(len(df)) / FS

# --------------------
# Bandpass filter (extract AC)
# --------------------
def extract_ac(signal, fs, low_hz=0.5, high_hz=5.0, order=2):
    nyq = fs / 2.0
    b, a = butter(order, [low_hz / nyq, high_hz / nyq], btype="band")
    return filtfilt(b, a, signal)

red_ac = extract_ac(red, FS, BAND_LOW, BAND_HIGH)
ir_ac  = extract_ac(ir,  FS, BAND_LOW, BAND_HIGH)

if INVERT_AC:
    red_ac = -red_ac
    ir_ac  = -ir_ac

# --------------------
# Simple SpO2 estimate (same curve as your example)
# --------------------
dc_red = float(np.mean(red))
dc_ir  = float(np.mean(ir))

# Peak-to-peak/2 amplitude of AC
ac_red_amp = float(np.ptp(red_ac) / 2.0)
ac_ir_amp  = float(np.ptp(ir_ac)  / 2.0)

if ac_red_amp > 0 and ac_ir_amp > 0 and dc_red > 0 and dc_ir > 0:
    R = (ac_red_amp / dc_red) / (ac_ir_amp / dc_ir)
    spo2 = -45.06 * R**2 + 30.354 * R + 94.845
    print(f"Estimated SpO2: {spo2:.2f}% (R={R:.4f})")
else:
    print("Insufficient AC/DC to estimate SpO2.")

# --------------------
# Plotly: raw waveforms
# --------------------
fig_raw = make_subplots(
    rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
    subplot_titles=("Raw RED PPG", "Raw IR PPG")
)

fig_raw.add_trace(
    go.Scatter(x=t, y=red, name="Raw RED PPG", mode="lines"),
    row=1, col=1
)
fig_raw.add_trace(
    go.Scatter(x=t, y=ir, name="Raw IR PPG", mode="lines"),
    row=2, col=1
)

fig_raw.update_xaxes(title_text="Time (s)", row=2, col=1, rangeslider=dict(visible=True))
fig_raw.update_yaxes(title_text="ADC Value", row=1, col=1)
fig_raw.update_yaxes(title_text="ADC Value", row=2, col=1)
fig_raw.update_layout(
    title="PPG Raw Waveforms (first 10,000 rows)",
    height=650,
    hovermode="x unified"
)
fig_raw.show()

# --------------------
# Plotly: filtered AC waveforms
# --------------------
fig_ac = make_subplots(
    rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
    subplot_titles=("Filtered AC RED PPG" + (" (inverted)" if INVERT_AC else ""),
                    "Filtered AC IR PPG" + (" (inverted)" if INVERT_AC else ""))
)

fig_ac.add_trace(
    go.Scatter(x=t, y=red_ac, name="AC RED", mode="lines"),
    row=1, col=1
)
fig_ac.add_trace(
    go.Scatter(x=t, y=ir_ac, name="AC IR", mode="lines"),
    row=2, col=1
)

fig_ac.update_xaxes(title_text="Time (s)", row=2, col=1, rangeslider=dict(visible=True))
fig_ac.update_yaxes(title_text="Amplitude", row=1, col=1)
fig_ac.update_yaxes(title_text="Amplitude", row=2, col=1)
fig_ac.update_layout(
    title=f"PPG AC Waveforms (band {BAND_LOW}-{BAND_HIGH} Hz, first {len(df)} rows)",
    height=650,
    hovermode="x unified"
)
fig_ac.show()

# (Optional) save to HTML
# fig_raw.write_html("ppg_raw.html", include_plotlyjs="cdn")
# fig_ac.write_html("ppg_ac.html", include_plotlyjs="cdn")


In [None]:
import numpy as np
import pandas as pd
from scipy.signal import butter, filtfilt, cheby1, ellip, firwin, savgol_filter, medfilt
from scipy.ndimage import gaussian_filter1d
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# --------------------
# Config
# --------------------
CSV_FILE = "PPG.csv"
COL_RED = "RED_ADC"
COL_IR = "IR_ADC"
FS = 100                 # sampling frequency (Hz) — change if known otherwise
N = 10_000               # limit rows
INVERT_AC = True         # invert AC so pulses point up
BAND_LOW = 0.5           # Hz (high-pass cutoff)
BAND_HIGH = 5.0          # Hz (low-pass cutoff)

# Filter config: choose type and settings
FILTER_TYPE = 'butterworth'  # Options: 'butterworth', 'chebyshev1', 'elliptic', 'fir', 'savgol', 'median', 'gaussian'
FILTER_ORDER = 2             # For IIR/FIR (higher = sharper cutoff)
RP = 1                       # Passband ripple (dB) for chebyshev1/elliptic
RS = 40                      # Stopband attenuation (dB) for elliptic
FIR_NUMTAPS = 101            # For FIR (odd number; higher = better but slower)
SAVGOL_WINDOW = 51           # For savgol (odd; larger = smoother)
SAVGOL_POLY = 3              # Polynomial order for savgol
MEDIAN_KERNEL = 5            # For median (odd; larger = more smoothing)
GAUSSIAN_SIGMA = 1.5         # For gaussian (higher = more blur)

# --------------------
# Load first N rows
# --------------------
df = pd.read_csv(CSV_FILE, nrows=N)

# Ensure required columns exist
missing = [c for c in (COL_RED, COL_IR) if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns in CSV: {missing}")

# (Optional) drop rows with NA in either channel
df = df[[COL_RED, COL_IR]].dropna()

red = df[COL_RED].to_numpy()
ir  = df[COL_IR].to_numpy()
t = np.arange(len(df)) / FS

# --------------------
# Filter function with alternatives
# --------------------
def apply_filter(signal, fs, filter_type='butterworth', low_hz=0.5, high_hz=5.0, order=2, **kwargs):
    nyq = fs / 2.0
    low_norm = low_hz / nyq
    high_norm = high_hz / nyq
    
    if filter_type == 'butterworth':
        b, a = butter(order, [low_norm, high_norm], btype='band')
        return filtfilt(b, a, signal)
    
    elif filter_type == 'chebyshev1':
        rp = kwargs.get('rp', 1)  # Passband ripple
        b, a = cheby1(order, rp, [low_norm, high_norm], btype='band')
        return filtfilt(b, a, signal)
    
    elif filter_type == 'elliptic':
        rp = kwargs.get('rp', 1)
        rs = kwargs.get('rs', 40)  # Stopband attenuation
        b, a = ellip(order, rp, rs, [low_norm, high_norm], btype='band')
        return filtfilt(b, a, signal)
    
    elif filter_type == 'fir':
        numtaps = kwargs.get('numtaps', 101)
        b = firwin(numtaps, [low_norm, high_norm], window='hamming', pass_zero=False)
        return filtfilt(b, 1, signal)  # FIR: a=1
    
    elif filter_type == 'savgol':
        window = kwargs.get('window', 51)
        poly = kwargs.get('poly', 3)
        return savgol_filter(signal, window, poly)  # Smoothing, not strict band-pass
    
    elif filter_type == 'median':
        kernel = kwargs.get('kernel', 5)
        return medfilt(signal, kernel_size=kernel)  # Non-linear smoothing
    
    elif filter_type == 'gaussian':
        sigma = kwargs.get('sigma', 1.5)
        return gaussian_filter1d(signal, sigma)  # Low-pass smoothing
    
    else:
        raise ValueError(f"Unknown filter_type: {filter_type}")

# Apply selected filter
red_ac = apply_filter(red, FS, FILTER_TYPE, BAND_LOW, BAND_HIGH, FILTER_ORDER,
                      rp=RP, rs=RS, numtaps=FIR_NUMTAPS, window=SAVGOL_WINDOW, poly=SAVGOL_POLY,
                      kernel=MEDIAN_KERNEL, sigma=GAUSSIAN_SIGMA)
ir_ac = apply_filter(ir, FS, FILTER_TYPE, BAND_LOW, BAND_HIGH, FILTER_ORDER,
                     rp=RP, rs=RS, numtaps=FIR_NUMTAPS, window=SAVGOL_WINDOW, poly=SAVGOL_POLY,
                     kernel=MEDIAN_KERNEL, sigma=GAUSSIAN_SIGMA)

if INVERT_AC:
    red_ac = -red_ac
    ir_ac  = -ir_ac

# --------------------
# Simple SpO2 estimate (same curve as your example)
# --------------------
dc_red = float(np.mean(red))
dc_ir  = float(np.mean(ir))

# Peak-to-peak/2 amplitude of AC
ac_red_amp = float(np.ptp(red_ac) / 2.0)
ac_ir_amp  = float(np.ptp(ir_ac)  / 2.0)

if ac_red_amp > 0 and ac_ir_amp > 0 and dc_red > 0 and dc_ir > 0:
    R = (ac_red_amp / dc_red) / (ac_ir_amp / dc_ir)
    spo2 = -45.06 * R**2 + 30.354 * R + 94.845
    print(f"Estimated SpO2: {spo2:.2f}% (R={R:.4f})")
else:
    print("Insufficient AC/DC to estimate SpO2.")

# --------------------
# Plotly: raw waveforms
# --------------------
fig_raw = make_subplots(
    rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
    subplot_titles=("Raw RED PPG", "Raw IR PPG")
)

fig_raw.add_trace(
    go.Scatter(x=t, y=red, name="Raw RED PPG", mode="lines"),
    row=1, col=1
)
fig_raw.add_trace(
    go.Scatter(x=t, y=ir, name="Raw IR PPG", mode="lines"),
    row=2, col=1
)

fig_raw.update_xaxes(title_text="Time (s)", row=2, col=1, rangeslider=dict(visible=True))
fig_raw.update_yaxes(title_text="ADC Value", row=1, col=1)
fig_raw.update_yaxes(title_text="ADC Value", row=2, col=1)
fig_raw.update_layout(
    title="PPG Raw Waveforms (first 10,000 rows)",
    height=650,
    hovermode="x unified"
)
fig_raw.show()

# --------------------
# Plotly: filtered AC waveforms
# --------------------
fig_ac = make_subplots(
    rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
    subplot_titles=(f"Filtered AC RED PPG ({FILTER_TYPE}" + (" inverted" if INVERT_AC else "") + ")",
                    f"Filtered AC IR PPG ({FILTER_TYPE}" + (" inverted" if INVERT_AC else "") + ")")
)

fig_ac.add_trace(
    go.Scatter(x=t, y=red_ac, name="AC RED", mode="lines"),
    row=1, col=1
)
fig_ac.add_trace(
    go.Scatter(x=t, y=ir_ac, name="AC IR", mode="lines"),
    row=2, col=1
)

fig_ac.update_xaxes(title_text="Time (s)", row=2, col=1, rangeslider=dict(visible=True))
fig_ac.update_yaxes(title_text="Amplitude", row=1, col=1)
fig_ac.update_yaxes(title_text="Amplitude", row=2, col=1)
fig_ac.update_layout(
    title=f"PPG AC Waveforms (band {BAND_LOW}-{BAND_HIGH} Hz, {FILTER_TYPE}, first {len(df)} rows)",
    height=650,
    hovermode="x unified"
)
fig_ac.show()

# (Optional) save to HTML
# fig_raw.write_html("ppg_raw.html", include_plotlyjs="cdn")
# fig_ac.write_html("ppg_ac.html", include_plotlyjs="cdn")

# --------------------
# (Optional) Compare multiple filters: Uncomment to plot side-by-side
# --------------------
filter_configs = [
    {'type': 'butterworth', 'order': 2},
    {'type': 'chebyshev1', 'order': 2, 'rp': 1},
    {'type': 'elliptic', 'order': 2, 'rp': 1, 'rs': 40},
    {'type': 'fir', 'numtaps': 101},
    {'type': 'savgol', 'window': 51, 'poly': 3},
    {'type': 'median', 'kernel': 5},
    {'type': 'gaussian', 'sigma': 1.5}
]

fig_compare = make_subplots(rows=len(filter_configs), cols=1, shared_xaxes=True, vertical_spacing=0.05)
for i, config in enumerate(filter_configs, 1):
    red_ac_comp = apply_filter(red, FS, **config)
    if INVERT_AC:
        red_ac_comp = -red_ac_comp
    fig_compare.add_trace(go.Scatter(x=t, y=red_ac_comp, name=f"RED AC ({config['type']})", mode="lines"), row=i, col=1)
    fig_compare.update_yaxes(title_text="Amplitude", row=i, col=1)

fig_compare.update_xaxes(title_text="Time (s)", row=len(filter_configs), col=1)
fig_compare.update_layout(title="Comparison of RED AC Waveforms Across Filters", height=200 * len(filter_configs), showlegend=True)
fig_compare.show()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt
import pandas as pd

df = pd.read_csv('PPG.csv')

red = df['RED_ADC'].values
ir = df['IR_ADC'].values

# red = np.array([205765, 205748, 205769, 205845, 205866, 205917, 205945, 205957, 205936, 205924, 205865, 205875, 205817, 205839, 205861, 205869, 205885, 205961, 205956, 205974, 206016, 206070, 206098, 206146, 206168, 206226, 206234, 206265, 206287, 206337, 206359, 206404, 206448, 206451, 206517, 206520, 206571, 206564, 206639, 206671, 206682, 206727, 206713, 206657, 206591, 206424, 206242, 205984, 205780, 205619, 205438, 205304, 205257, 205215, 205215, 205201, 205216, 205211, 205216, 205228, 205252, 205307, 205405, 205379, 205449, 205428, 205428, 205412, 205408, 205375, 205399, 205377, 205393, 205396, 205390, 205439, 205486, 205543, 205579, 205587, 205662, 205697, 205728, 205783, 205777, 205823, 205831, 205861, 205888, 205890, 205937, 205967, 206004, 206061, 206099, 206163, 206190, 206225, 206266, 206292, 206334, 206365, 206310, 206271, 206134, 205980, 205751, 205554, 205368, 205177, 205081, 204950, 204931, 204929, 204922, 204921, 204945, 205002, 205034, 205033, 205054, 205057, 205096, 205150, 205175, 205176, 205130, 205157, 205127, 205111, 205123, 205091, 205124, 205154, 205190, 205259, 205342, 205381, 205457, 205527, 205574, 205639, 205731, 205768, 205832, 205871, 205883, 205906, 205916, 205942, 205974, 205998, 206047, 206041, 206073, 206115, 206174, 206195, 206204, 206250, 206305, 206293, 206255, 206147, 205997, 205818, 205604, 205428, 205280, 205130, 205069, 205042, 205044, 205026, 205042, 205070, 205054, 205107, 205183, 205235, 205282, 205344, 205382, 205436, 205442, 205439, 205442, 205486, 205482, 205460, 205475, 205529, 205566, 205632, 205652, 205720, 205768, 205834, 205871, 205961, 206046, 206076, 206141, 206184, 206269, 206279, 206339, 206396, 206475, 206531, 206600, 206675, 206725, 206831, 206900, 206974, 207022, 207105, 207229, 207285, 207330, 207357, 207262, 207100, 206948, 206745, 206540, 206378, 206284, 206213, 206165, 206140, 206164, 206226, 206278, 206298, 206404, 206424, 206479, 206564, 206613, 206683, 206737, 206773, 206770, 206756, 206755, 206725, 206733, 206647, 206727, 206779, 206809, 206826, 206853, 206901, 206969, 207001, 207073, 207018, 207154, 207210, 207208, 207247, 207308, 207384])

# ir = np.array([176980, 177006, 177042, 177079, 177152, 177236, 177272, 177294, 177291, 177297, 177268, 177250, 177222, 177218, 177232, 177246, 177301, 177355, 177400, 177442, 177520, 177573, 177660, 177694, 177778, 177842, 177871, 177935, 178010, 178063, 178063, 178174, 178225, 178236, 178333, 178403, 178463, 178539, 178587, 178649, 178730, 178738, 178728, 178680, 178530, 178270, 177936, 177576, 177260, 176940, 176661, 176487, 176361, 176306, 176271, 176276, 176286, 176296, 176325, 176337, 176423, 176456, 176588, 176611, 176676, 176715, 176718, 176720, 176671, 176658, 176671, 176622, 176656, 176694, 176715, 176767, 176821, 176889, 176928, 177000, 177114, 177155, 177227, 177285, 177339, 177386, 177435, 177507, 177538, 177549, 177613, 177717, 177766, 177815, 177906, 177965, 178043, 178124, 178193, 178248, 178333, 178347, 178289, 178169, 178000, 177690, 177353, 176999, 176695, 176397, 176201, 176085, 176014, 175986, 175984, 175997, 176074, 176084, 176124, 176150, 176223, 176291, 176382, 176458, 176478, 176481, 176475, 176466, 176413, 176420, 176379, 176406, 176434, 176454, 176511, 176613, 176701, 176828, 176908, 176998, 177117, 177221, 177311, 177397, 177437, 177543, 177559, 177614, 177687, 177729, 177789, 177819, 177877, 177947, 177978, 178057, 178132, 178204, 178233, 178310, 178344, 178357, 178297, 178143, 177887, 177588, 177221, 176914, 176623, 176421, 176286, 176212, 176189, 176215, 176214, 176255, 176293, 176345, 176442, 176505, 176616, 176695, 176790, 176873, 176909, 176912, 176939, 176939, 176924, 176912, 176968, 176995, 177042, 177102, 177195, 177280, 177354, 177443, 177524, 177644, 177759, 177838, 177953, 177995, 178087, 178123, 178235, 178279, 178364, 178498, 178598, 178705, 178807, 178922, 179021, 179139, 179264, 179378, 179459, 179556, 179610, 179601, 179419, 179180, 178920, 178557, 178220, 177962, 177788, 177620, 177560, 177555, 177557, 177622, 177703, 177802, 177886, 177948, 178048, 178169, 178288, 178386, 178471, 178476, 178522, 178476, 178496, 178466, 178468, 178437, 178449, 178469, 178563, 178590, 178651, 178742, 178813, 178872, 178975, 179074, 179160, 179204, 179294, 179374, 179426, 179483])

# Sampling frequency (adjust if known)
fs = 100
t = np.arange(len(red)) / fs  # Time in seconds

# Extract AC function
def extract_ac(signal, fs, low=0.5, high=5):
    nyq = fs / 2
    low = low / nyq
    high = high / nyq
    b, a = butter(2, [low, high], btype='band')
    return filtfilt(b, a, signal)

# Extract AC
red_ac = extract_ac(red, fs)
ir_ac = extract_ac(ir, fs)

# Invert for conventional upward peaks (set to False if you prefer original)
invert_ac = True
if invert_ac:
    red_ac = -red_ac
    ir_ac = -ir_ac

# Simple SpO2 estimate (empirical; calibrate for your sensor)
dc_red = np.mean(red)
dc_ir = np.mean(ir)
ac_red_amp = np.ptp(red_ac) / 2  # Peak-to-peak / 2 for amplitude
ac_ir_amp = np.ptp(ir_ac) / 2
if ac_red_amp > 0 and ac_ir_amp > 0:
    R = (ac_red_amp / dc_red) / (ac_ir_amp / dc_ir)
    spo2 = -45.06 * R**2 + 30.354 * R + 94.845  # Example calibration curve
    print(f"Estimated SpO2: {spo2:.2f}%")
else:
    print("Insufficient AC for SpO2")


# Plot raw waveforms
plt.figure(figsize=(12, 6))
plt.subplot(2, 1, 1)
plt.plot(t, red, 'r', label='Raw RED PPG')
plt.xlabel('Time (seconds)')
plt.ylabel('ADC Value')
plt.title('Raw RED PPG Waveform')
plt.legend()
plt.grid(True)

plt.subplot(2, 1, 2)
plt.plot(t, ir, 'b', label='Raw IR PPG')
plt.xlabel('Time (seconds)')
plt.ylabel('ADC Value')
plt.title('Raw IR PPG Waveform')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

# Plot filtered (and optionally inverted) AC waveforms
plt.figure(figsize=(12, 6))
plt.subplot(2, 1, 1)
plt.plot(t, red_ac, 'r', label='AC RED PPG (Inverted if enabled)')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.title('Filtered AC RED PPG Waveform')
plt.legend()
plt.grid(True)

plt.subplot(2, 1, 2)
plt.plot(t, ir_ac, 'b', label='AC IR PPG (Inverted if enabled)')
plt.xlabel('Time (seconds)')
plt.ylabel('Amplitude')
plt.title('Filtered AC IR PPG Waveform')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()

In [None]:
import numpy as np
from scipy.optimize import curve_fit
import matplotlib.pyplot as plt

# Sample calibration data: R ratios and corresponding SpO2 (%) from literature/examples
# Replace with your actual pairs (e.g., from experiments or datasets)
r_data = np.array([0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.2, 1.4, 1.6, 2.0])  # R values
spo2_data = np.array([100, 98, 96, 94, 92, 90, 85, 80, 75, 70, 60])     # Corresponding SpO2

# Quadratic model function
def quadratic(R, a, b, c):
    return a * R**2 + b * R + c

# Fit the model to data
popt, pcov = curve_fit(quadratic, r_data, spo2_data)
a, b, c = popt  # Dynamically estimated coefficients

print(f"Dynamic coefficients: a = {a:.2f}, b = {b:.2f}, c = {c:.2f}")

# Example usage: Compute SpO2 for a sample R=0.6
sample_r = 0.6
spo2_est = quadratic(sample_r, a, b, c)
print(f"Estimated SpO2 for R={sample_r}: {spo2_est:.2f}%")

# Plot to verify fit
r_fit = np.linspace(min(r_data), max(r_data), 100)
spo2_fit = quadratic(r_fit, *popt)
plt.scatter(r_data, spo2_data, label='Calibration Data')
plt.plot(r_fit, spo2_fit, 'r', label='Fitted Curve')
plt.xlabel('R Ratio')
plt.ylabel('SpO2 (%)')
plt.legend()
plt.show()

In [None]:
# app_ppg_dash.py
import numpy as np
import pandas as pd
from scipy.signal import iirfilter, sosfiltfilt
from pathlib import Path

import dash
from dash import dcc, html, Input, Output, State, callback_context
import plotly.graph_objects as go
from plotly.subplots import make_subplots

# --------------------
# Config
# --------------------
CSV_FILE = "PPG.csv"
COL_RED = "RED_ADC"
COL_IR = "IR_ADC"
FS_DEFAULT = 100.0        # Hz
N_ROWS = 10_000           # hard cap as requested

# --------------------
# Load data (first N rows)
# --------------------
if not Path(CSV_FILE).exists():
    raise FileNotFoundError(f"CSV not found: {CSV_FILE}")

df = pd.read_csv(CSV_FILE, nrows=N_ROWS)
missing = [c for c in (COL_RED, COL_IR) if c not in df.columns]
if missing:
    raise ValueError(f"Missing columns in CSV: {missing}")

df = df[[COL_RED, COL_IR]].dropna().reset_index(drop=True)
red = df[COL_RED].to_numpy(dtype=float)
ir  = df[COL_IR].to_numpy(dtype=float)

# --------------------
# Helpers
# --------------------
def design_and_apply_filter(x, fs, family, resp_type, low_hz, high_hz, order, rp, rs):
    """
    Build IIR filter as SOS and apply filtfilt. Handles LP/HP/BP/BS.
    family: 'butter', 'cheby1', 'cheby2', 'ellip', 'bessel'
    resp_type: 'lowpass' | 'highpass' | 'bandpass' | 'bandstop'
    """
    nyq = fs / 2.0
    # sanitize & map cutoffs
    if resp_type in ("lowpass", "highpass"):
        Wn = float(high_hz if resp_type == "lowpass" else low_hz)
        Wn = max(1e-6, min(Wn, nyq * 0.999))  # keep strictly < Nyquist
    else:
        lo = max(1e-6, min(low_hz, high_hz))
        hi = max(lo + 1e-6, high_hz)
        hi = min(hi, nyq * 0.999)
        Wn = [lo, hi]

    kwargs = dict(ftype=family, fs=fs, output="sos")
    # ripple params only apply to certain families
    if family == "cheby1":
        kwargs["rp"] = rp
    elif family == "cheby2":
        kwargs["rs"] = rs
    elif family == "ellip":
        kwargs["rp"] = rp
        kwargs["rs"] = rs

    sos = iirfilter(order, Wn, btype=resp_type, **kwargs)
    y = sosfiltfilt(sos, x)
    return y

def estimate_spo2(red_raw, ir_raw, red_filt, ir_filt):
    """
    Same simple rule-of-thumb as your script:
    R = (AC_red/DC_red) / (AC_ir/DC_ir), then quadratic.
    AC amplitude via peak-to-peak/2, DC via mean(raw).
    """
    dc_red = float(np.mean(red_raw))
    dc_ir  = float(np.mean(ir_raw))
    ac_red_amp = float(np.ptp(red_filt) / 2.0)
    ac_ir_amp  = float(np.ptp(ir_filt)  / 2.0)

    if min(dc_red, dc_ir, ac_red_amp, ac_ir_amp) <= 0:
        return None, None
    R = (ac_red_amp / dc_red) / (ac_ir_amp / dc_ir)
    spo2 = -45.06 * R**2 + 30.354 * R + 94.845
    return spo2, R

# --------------------
# App UI
# --------------------
app = dash.Dash(__name__)
app.title = "PPG Filter Explorer (10k rows cap)"

app.layout = html.Div(className="container", children=[
    html.H2("PPG Filter Explorer (first 10,000 rows)"),
    html.Div(style={"display": "flex", "gap": "24px", "flexWrap": "wrap"}, children=[

        html.Div(style={"minWidth": "260px"}, children=[
            html.Label("Sampling frequency (Hz)"),
            dcc.Input(id="fs", type="number", value=FS_DEFAULT, step=0.5, min=1, debounce=True, style={"width": "100%"}),

            html.Br(), html.Br(),
            html.Label("Filter family"),
            dcc.Dropdown(
                id="family",
                value="butter",
                options=[
                    {"label": "Butterworth", "value": "butter"},
                    {"label": "Chebyshev I", "value": "cheby1"},
                    {"label": "Chebyshev II", "value": "cheby2"},
                    {"label": "Elliptic", "value": "ellip"},
                    {"label": "Bessel", "value": "bessel"},
                ],
                clearable=False
            ),

            html.Br(),
            html.Label("Response type"),
            dcc.Dropdown(
                id="resp",
                value="bandpass",
                options=[
                    {"label": "Bandpass", "value": "bandpass"},
                    {"label": "Bandstop (Notch)", "value": "bandstop"},
                    {"label": "Lowpass", "value": "lowpass"},
                    {"label": "Highpass", "value": "highpass"},
                ],
                clearable=False
            ),

            html.Br(),
            html.Label("Cutoff frequencies (Hz)"),
            html.Div("For LP: use High only. For HP: use Low only."),
            html.Div(style={"display": "grid", "gridTemplateColumns": "1fr 1fr", "gap": "8px"}, children=[
                html.Div(children=[
                    html.Div("Low"),
                    dcc.Input(id="low_hz", type="number", value=0.5, step=0.1, min=0.0, debounce=True, style={"width": "100%"})
                ]),
                html.Div(children=[
                    html.Div("High"),
                    dcc.Input(id="high_hz", type="number", value=5.0, step=0.1, min=0.1, debounce=True, style={"width": "100%"})
                ]),
            ]),

            html.Br(),
            html.Label("Order"),
            dcc.Slider(id="order", min=1, max=8, step=1, value=2,
                       marks={i: str(i) for i in range(1, 9)}),

            html.Br(),
            html.Label("Ripple params (only for Chebyshev/Elliptic)"),
            html.Div(style={"display": "grid", "gridTemplateColumns": "1fr 1fr", "gap": "8px"}, children=[
                html.Div(children=[
                    html.Div("rp (dB, passband)"),
                    dcc.Input(id="rp", type="number", value=1.0, step=0.1, min=0.01, debounce=True, style={"width": "100%"})
                ]),
                html.Div(children=[
                    html.Div("rs (dB, stopband)"),
                    dcc.Input(id="rs", type="number", value=40.0, step=1, min=10, debounce=True, style={"width": "100%"})
                ]),
            ]),

            html.Br(),
            dcc.Checklist(
                id="invert",
                options=[{"label": "Invert filtered AC (pulse up)", "value": "invert"}],
                value=["invert"]
            ),
        ]),

        html.Div(style={"flex": "1", "minWidth": "360px"}, children=[
            html.Div(id="metrics", style={"padding": "8px 12px", "border": "1px solid #ddd", "borderRadius": "8px", "marginBottom": "10px"}),
            dcc.Graph(id="fig_raw", style={"height": "360px"}),
            dcc.Graph(id="fig_ac", style={"height": "360px"}),
        ]),
    ]),

    html.Div(style={"marginTop": "6px", "color": "#666"}, children=f"Loaded {len(df)} rows from {CSV_FILE}")
])

# --------------------
# Callbacks
# --------------------
@app.callback(
    Output("fig_raw", "figure"),
    Output("fig_ac", "figure"),
    Output("metrics", "children"),
    Input("fs", "value"),
    Input("family", "value"),
    Input("resp", "value"),
    Input("low_hz", "value"),
    Input("high_hz", "value"),
    Input("order", "value"),
    Input("rp", "value"),
    Input("rs", "value"),
    Input("invert", "value"),
)
def update_plots(fs, family, resp, low_hz, high_hz, order, rp, rs, invert_flags):
    fs = float(fs or FS_DEFAULT)
    low_hz = float(low_hz or 0.5)
    high_hz = float(high_hz or 5.0)
    order = int(order or 2)
    rp = float(rp or 1.0)
    rs = float(rs or 40.0)
    invert = "invert" in (invert_flags or [])

    t = np.arange(len(df)) / fs

    # RAW figure
    fig_raw = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
                            subplot_titles=("Raw RED PPG", "Raw IR PPG"))
    fig_raw.add_trace(go.Scatter(x=t, y=red, name="Raw RED", mode="lines"), 1, 1)
    fig_raw.add_trace(go.Scatter(x=t, y=ir,  name="Raw IR",  mode="lines"), 2, 1)
    fig_raw.update_xaxes(title_text="Time (s)", row=2, col=1, rangeslider={"visible": True})
    fig_raw.update_yaxes(title_text="ADC", row=1, col=1)
    fig_raw.update_yaxes(title_text="ADC", row=2, col=1)
    fig_raw.update_layout(title=f"PPG Raw Waveforms (first {len(df)} rows)", hovermode="x unified", height=360)

    # FILTERED figure
    # Be resilient to invalid combos
    try:
        red_ac = design_and_apply_filter(red, fs, family, resp, low_hz, high_hz, order, rp, rs)
        ir_ac  = design_and_apply_filter(ir,  fs, family, resp, low_hz, high_hz, order, rp, rs)
        if invert:
            red_ac = -red_ac
            ir_ac  = -ir_ac

        fig_ac = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08,
                               subplot_titles=(f"Filtered RED ({family}, {resp})",
                                               f"Filtered IR ({family}, {resp})"))
        fig_ac.add_trace(go.Scatter(x=t, y=red_ac, name="AC RED", mode="lines"), 1, 1)
        fig_ac.add_trace(go.Scatter(x=t, y=ir_ac,  name="AC IR",  mode="lines"), 2, 1)
        fig_ac.update_xaxes(title_text="Time (s)", row=2, col=1, rangeslider={"visible": True})
        fig_ac.update_yaxes(title_text="Amplitude", row=1, col=1)
        fig_ac.update_yaxes(title_text="Amplitude", row=2, col=1)
        fig_ac.update_layout(title=f"PPG Filtered Waveforms (order {order})", hovermode="x unified", height=360)

        # SpO2 estimate
        spo2, R = estimate_spo2(red, ir, red_ac, ir_ac)
        met = (f"Estimated SpO₂: {spo2:.2f}% (R={R:.4f})"
               if (spo2 is not None) else "Estimated SpO₂: n/a (insufficient AC/DC)")
    except Exception as e:
        fig_ac = go.Figure()
        fig_ac.update_layout(title=f"Filter error: {e}")
        met = f"Filter error: {e}"

    # Metrics panel
    metrics = html.Div([
        html.Div(f"Filter: {family} • {resp} • order={order} • fs={fs:g} Hz"),
        html.Div(f"Low={low_hz:g} Hz, High={high_hz:g} Hz • rp={rp:g} dB, rs={rs:g} dB • invert={invert}"),
        html.Strong(met),
    ])

    return fig_raw, fig_ac, metrics

# --------------------
# Main
# --------------------
if __name__ == "__main__":
    app.run_server(debug=True)
