# Test Your Algorithm

## Instructions
1. From the **Pulse Rate Algorithm** Notebook you can do one of the following:
   - Copy over all the **Code** section to the following Code block.
   - Download as a Python (`.py`) and copy the code to the following Code block.
2. In the bottom right, click the <span style="color:blue">Test Run</span> button. 

### Didn't Pass
If your code didn't pass the test, go back to the previous Concept or to your local setup and continue iterating on your algorithm and try to bring your training error down before testing again.

### Pass
If your code passes the test, complete the following! You **must** include a screenshot of your code and the Test being **Passed**. Here is what the starter filler code looks like when the test is run and should be similar. A passed test will include in the notebook a green outline plus a box with **Test passed:** and in the Results bar at the bottom the progress bar will be at 100% plus a checkmark with **All cells passed**.
![Example](example.png)

1. Take a screenshot of your code passing the test, make sure it is in the format `.png`. If not a `.png` image, you will have to edit the Markdown render the image after Step 3. Here is an example of what the `passed.png` would look like 
2. Upload the screenshot to the same folder or directory as this jupyter notebook.
3. Rename the screenshot to `passed.png` and it should show up below.
![Passed](passed.png)
4. Download this jupyter notebook as a `.pdf` file. 
5. Continue to Part 2 of the Project. 

In [1]:
# replace the code below with your pulse rate algorithm.
import glob
import numpy as np
import scipy as sp
import scipy.io
from scipy.signal import butter, filtfilt
import matplotlib.pyplot as plt

def LoadTroikaDataset():
    """
    Retrieve the .mat filenames for the troika dataset.

    Returns:
        data_fls: Names of the .mat files that contain signal data
        ref_fls: Names of the .mat files that contain reference data
    """
    data_dir = "/content/nd320-c4-wearable-data-project-starter/part_1/datasets/troika/training_data"  # adapte ce chemin selon ton environnement
    data_fls = sorted(glob.glob(data_dir + "/DATA_*.mat"))
    ref_fls = sorted(glob.glob(data_dir + "/REF_*.mat"))
    return data_fls, ref_fls

def LoadTroikaDataFile(data_fl):
    """
    Loads and extracts signals from a troika data file.

    Args:
        data_fl: (str) filepath to a troika .mat file.

    Returns:
        numpy arrays for ppg, accx, accy, accz signals.
    """
    data = sp.io.loadmat(data_fl)['sig']
    return data[2], data[0], data[1], data[3]  # ppg, accx, accy, accz

def bandpass_filter(signal, fs, lowcut=0.67, highcut=4.0, order=4):
    """
    Applique un filtre passe-bande Butterworth.

    Args:
        signal (np.array): signal à filtrer
        fs (float): fréquence d'échantillonnage (Hz)
        lowcut (float): fréquence basse (Hz)
        highcut (float): fréquence haute (Hz)
        order (int): ordre du filtre

    Returns:
        np.array: signal filtré
    """
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    filtered_signal = filtfilt(b, a, signal)
    return filtered_signal

def AggregateErrorMetric(pr_errors, confidence_est):
    """
    Computes an aggregate error metric based on confidence estimates.

    Computes the MAE at 90% availability.

    Args:
        pr_errors: numpy array of pulse rate estimation errors
        confidence_est: numpy array of confidence estimates

    Returns:
        MAE at 90% availability
    """
    percentile90_confidence = np.percentile(confidence_est, 10)
    best_estimates = pr_errors[confidence_est >= percentile90_confidence]
    return np.mean(np.abs(best_estimates))

def RunPulseRateAlgorithm(data_fl, ref_fl):
    """
    Calculate heart rate error and confidence on a data file.
    Enhanced version with accelerometer consideration for confidence.

    Args:
        data_fl (str): path to the data file (.mat)
        ref_fl (str): path to the reference file (.mat)

    Returns:
        Tuple (errors, confidence) numpy arrays
    """
    # Load PPG and accelerometer data
    ppg, accx, accy, accz = LoadTroikaDataFile(data_fl)
    fs = 125  # sampling frequency in Hz

    # Basic check: ensure signal length is sufficient for windowing
    if len(ppg) < 8 * fs:
        raise ValueError("Signal length is too short for 8-second window processing.")

    # Bandpass filter the PPG signal (40-240 BPM)
    ppg_filtered = bandpass_filter(ppg, fs)
    
    # Compute accelerometer norm and apply bandpass filter
    acc_norm = np.sqrt(accx**2 + accy**2 + accz**2)
    acc_filtered = bandpass_filter(acc_norm, fs)

    window_size = 8 * fs  # 8-second window
    step_size = 2 * fs    # 2-second step
    n_windows = (len(ppg_filtered) - window_size) // step_size + 1

    pulse_rates = []
    confidences = []

    # Load reference heart rate data
    ref_data = sp.io.loadmat(ref_fl)['BPM0'].flatten()
    previous_bpm = None  # track previous heart rate for physiological constraint

    for i in range(n_windows):
        start = i * step_size
        end = start + window_size

        # Extract current PPG window
        window_ppg = ppg_filtered[start:end]
        
        # Compute FFT of PPG window
        freqs = np.fft.rfftfreq(len(window_ppg), 1/fs)
        fft_mag = np.abs(np.fft.rfft(window_ppg))
        
        # Restrict to physiological frequency band (0.67-4 Hz ~ 40-240 BPM)
        valid_idx = np.where((freqs >= 0.67) & (freqs <= 4.0))
        freqs_valid = freqs[valid_idx]
        mag_valid = fft_mag[valid_idx]

        # Detect dominant frequency in PPG signal
        peak_idx = np.argmax(mag_valid)
        peak_freq = freqs_valid[peak_idx]

        # Extract accelerometer window and compute FFT
        window_acc = acc_filtered[start:end]
        acc_fft_mag = np.abs(np.fft.rfft(window_acc))
        acc_freqs = np.fft.rfftfreq(len(window_acc), 1/fs)
        
        # Restrict accelerometer FFT to physiological band
        valid_acc_idx = np.where((acc_freqs >= 0.67) & (acc_freqs <= 4.0))
        acc_freqs_valid = acc_freqs[valid_acc_idx]
        acc_mag_valid = acc_fft_mag[valid_acc_idx]

        # Detect dominant frequency in accelerometer signal
        acc_peak_idx = np.argmax(acc_mag_valid)
        acc_peak_freq = acc_freqs_valid[acc_peak_idx]

        # Convert frequency to BPM
        current_bpm = peak_freq * 60

        # Physiological constraint: max 3 BPM change between consecutive windows
        if previous_bpm is not None and abs(current_bpm - previous_bpm) > 3:
            current_bpm = previous_bpm + 3 * np.sign(current_bpm - previous_bpm)

        pulse_rates.append(current_bpm)
        previous_bpm = current_bpm

        # Calculate confidence: energy around peak ±0.1 Hz
        band_width = 0.1
        band_idx = np.where((freqs_valid >= peak_freq - band_width) & (freqs_valid <= peak_freq + band_width))
        energy_band = np.sum(mag_valid[band_idx])
        energy_total = np.sum(mag_valid)
        confidence = energy_band / energy_total if energy_total > 0 else 0

        # Confidence represents the proportion of signal energy concentrated around the detected peak frequency.
        # A higher confidence means the peak is more dominant and the estimate is more reliable.

        # Moderate confidence reduction if PPG and accelerometer frequencies are close (likely motion artifact)
        if abs(peak_freq - acc_peak_freq) < 0.1:
            if confidence < 0.6:  # empirical threshold to avoid penalizing good estimates
                confidence *= 0.7  # gentle reduction

        confidences.append(confidence)

    # Convert results to numpy arrays
    pulse_rates = np.array(pulse_rates)
    confidences = np.array(confidences)

    # Trim reference to match estimation length
    ref_trim = ref_data[:len(pulse_rates)]
    
    # Calculate absolute errors
    errors = np.abs(pulse_rates - ref_trim)

    return errors, confidences




def Evaluate():
    """
    Évalue la performance sur l'ensemble Troika.

    Returns:
        erreur agrégée sur l'ensemble
    """
    data_fls, ref_fls = LoadTroikaDataset()
    errs, confs = [], []
    for data_fl, ref_fl in zip(data_fls, ref_fls):
        errors, confidence = RunPulseRateAlgorithm(data_fl, ref_fl)
        errs.append(errors)
        confs.append(confidence)
    errs = np.hstack(errs)
    confs = np.hstack(confs)
    return AggregateErrorMetric(errs, confs)
