In [13]:
import os
import numpy as np
import pandas as pd
import antropy as ant
import neurokit2 as nk
import heartpy as hp


def calculate_bvp_features(signal: np.ndarray, sampling_rate: int) -> dict:
    """
    Calculates time-domain, frequency-domain, and non-linear features 
    from a BVP/PPG signal.

    Args:
        signal (np.ndarray): A 1D numpy array containing the BVP/PPG signal.
        sampling_rate (int): The sampling rate of the signal in Hz.

    Returns:
        dict: A dictionary containing the calculated features. Returns an empty 
              dictionary if processing fails or insufficient peaks are found.
    """
    
    features = {}

    # --- Basic Signal Processing and Peak Detection ---
    try:
        # Process the signal to find peaks
        # clean=False might be needed if default cleaning removes too much
        signals, info = nk.ppg_process(signal, sampling_rate=sampling_rate) 
        peaks = info["PPG_Peaks"]

        # Ensure enough peaks are found for reliable HRV analysis (e.g., > 5 peaks for basic stats)
        if len(peaks) < 5:
            print(f"Warning: Insufficient peaks ({len(peaks)}) detected for reliable analysis.")
            return {} # Return empty dict if not enough peaks

        # Calculate NN intervals (IBIs) in milliseconds
        nn_intervals = np.diff(peaks) / sampling_rate * 1000 
        
        # Artifact correction might be needed here for robust results, 
        # but NeuroKit's ppg_process includes some cleaning. 
        # For advanced use, consider nk.hrv_clean or similar methods. [8]

    except Exception as e:
        print(f"Error during PPG processing or peak detection: {e}")
        return {}

    # --- Calculate HRV Features using NeuroKit2 ---
    try:
        # Calculate all HRV domain features at once
        hrv_indices = nk.hrv(peaks, sampling_rate=sampling_rate, show=False)
        
        # Calculate multiscale entropy separately if needed (nk.hrv doesn't include it by default)
        # Note: MSE can be computationally intensive and requires sufficient data length.
        mse_val = nk.entropy_multiscale(nn_intervals, show=False) 
        # mse_val is often reported as a single summary statistic (e.g., Area), 
        # or as values across scales. We'll store the Area if available.

    except Exception as e:
        print(f"Error during HRV feature calculation: {e}")
        # Proceed with BR if possible, but HRV features will be missing
        hrv_indices = pd.DataFrame() # Empty dataframe
        mse_val = np.nan


    # --- Calculate Breathing Rate (BR) ---
    # BR estimation from PPG is complex and can be less reliable [2]. 
    # NeuroKit offers methods to estimate RSP signal from PPG.
    try:
        # Estimate Respiratory Rate using the processed PPG signal (signals['PPG_Clean'])
        rsp_signal = nk.ppg_rsp(signals['PPG_Clean'], sampling_rate=sampling_rate)
        rsp_info = nk.rsp_process(rsp_signal, sampling_rate=sampling_rate)
        # Take the mean respiratory rate over the recording
        breathing_rate = rsp_info[0]['RSP_Rate'].mean() 
    except Exception as e:
        print(f"Warning: Could not calculate Breathing Rate: {e}")
        breathing_rate = np.nan

    # --- Populate the features dictionary ---
    
    # Time-domain
    features['HR'] = hrv_indices['HRV_MeanHR'].iloc[0] if 'HRV_MeanHR' in hrv_indices else np.nan # Heart Rate (average) [1, 10]
    features['BR'] = breathing_rate # Breathing Rate [2]
    features['IBI'] = hrv_indices['HRV_MeanNN'].iloc[0] if 'HRV_MeanNN' in hrv_indices else np.nan # Interbeat interval (average NN) [3, 10]
    features['pNN20'] = hrv_indices['HRV_pNN20'].iloc[0] if 'HRV_pNN20' in hrv_indices else np.nan # [3]
    features['pNN50'] = hrv_indices['HRV_pNN50'].iloc[0] if 'HRV_pNN50' in hrv_indices else np.nan # [3]
    features['SDNN'] = hrv_indices['HRV_SDNN'].iloc[0] if 'HRV_SDNN' in hrv_indices else np.nan # [3]
    features['RMSSD'] = hrv_indices['HRV_RMSSD'].iloc[0] if 'HRV_RMSSD' in hrv_indices else np.nan # [3]
    features['SDSD'] = hrv_indices['HRV_SDSD'].iloc[0] if 'HRV_SDSD' in hrv_indices else np.nan # [3]
    features['NN50'] = hrv_indices['HRV_NN50'].iloc[0] if 'HRV_NN50' in hrv_indices else np.nan # NN50 count [3]
    features['HRV_triangular_index'] = hrv_indices['HRV_HTI'].iloc[0] if 'HRV_HTI' in hrv_indices else np.nan # [4]
    features['TINN'] = hrv_indices['HRV_TINN'].iloc[0] if 'HRV_TINN' in hrv_indices else np.nan # [4]

    # Frequency-domain (units ms^2) [5]
    features['VLF_power'] = hrv_indices['HRV_VLF'].iloc[0] if 'HRV_VLF' in hrv_indices else np.nan # Typically 0.0033–0.04 Hz [5]
    features['LF_power'] = hrv_indices['HRV_LF'].iloc[0] if 'HRV_LF' in hrv_indices else np.nan   # Typically 0.04–0.15 Hz [5]
    features['HF_power'] = hrv_indices['HRV_HF'].iloc[0] if 'HRV_HF' in hrv_indices else np.nan   # Typically 0.15–0.4 Hz [5]
    features['LF_HF_ratio'] = hrv_indices['HRV_LFHF'].iloc[0] if 'HRV_LFHF' in hrv_indices else np.nan # [5]
    features['Total_power'] = hrv_indices['HRV_TotalPower'].iloc[0] if 'HRV_TotalPower' in hrv_indices else np.nan # Sum of VLF, LF, HF [5]

    # Non-linear
    features['Poincare_SD1'] = hrv_indices['HRV_SD1'].iloc[0] if 'HRV_SD1' in hrv_indices else np.nan # Short-term variability [6, 11]
    features['Poincare_SD2'] = hrv_indices['HRV_SD2'].iloc[0] if 'HRV_SD2' in hrv_indices else np.nan # Long-term variability [6, 11]
    features['Sample_entropy'] = hrv_indices['HRV_SampEn'].iloc[0] if 'HRV_SampEn' in hrv_indices else np.nan # [7, 11]
    features['DFA_alpha1'] = hrv_indices['HRV_DFA_alpha1'].iloc[0] if 'HRV_DFA_alpha1' in hrv_indices else np.nan # Short-term fractal scaling exponent [8, 11]
    features['DFA_alpha2'] = hrv_indices['HRV_DFA_alpha2'].iloc[0] if 'HRV_DFA_alpha2' in hrv_indices else np.nan # Long-term fractal scaling exponent (requires longer signal) [8, 11]
    # Extracting a single value from MSE output (e.g., Area under the MSE curve)
    features['Multiscale_entropy'] = mse_val['MSE_Area'] if isinstance(mse_val, dict) and 'MSE_Area' in mse_val else np.nan # [9]
    
    # Clean up NaNs potentially introduced by failed calculations within NeuroKit
    features = {k: (None if pd.isna(v) else v) for k, v in features.items()}

    return features

In [None]:
def calculate_bvp_features(signal: np.ndarray, sampling_rate: int) -> dict:
    """
    Calculates time-domain, frequency-domain, and non-linear features 
    from a BVP/PPG signal.

    Args:
        signal (np.ndarray): A 1D numpy array containing the BVP/PPG signal.
        sampling_rate (int): The sampling rate of the signal in Hz.

    Returns:
        dict: A dictionary containing the calculated features. Returns an empty 
              dictionary if processing fails or insufficient peaks are found.
    """
    
    features = {}

    # --- Basic Signal Processing and Peak Detection ---
    # try:
    # Process the signal to find peaks
    # clean=False might be needed if default cleaning removes too much
    signals, info = nk.ppg_process(signal, sampling_rate=sampling_rate) 
    peaks = info["PPG_Peaks"]

    # Ensure enough peaks are found for reliable HRV analysis (e.g., > 5 peaks for basic stats)
    if len(peaks) < 5:
        print(f"Warning: Insufficient peaks ({len(peaks)}) detected for reliable analysis.")
        return {} # Return empty dict if not enough peaks

    # Calculate NN intervals (IBIs) in milliseconds
    nn_intervals = np.diff(peaks) / sampling_rate * 1000 
    
    # Artifact correction might be needed here for robust results, 
    # but NeuroKit's ppg_process includes some cleaning. 
    # For advanced use, consider nk.hrv_clean or similar methods. [8]

    # except Exception as e:
    #     print(f"Error during PPG processing or peak detection: {e}")
    #     return {}

    # --- Calculate HRV Features using NeuroKit2 ---
    # try:
    # Calculate all HRV domain features at once
    hrv_indices = nk.hrv(peaks, sampling_rate=sampling_rate, show=False)
    
    # Calculate multiscale entropy separately if needed (nk.hrv doesn't include it by default)
    # Note: MSE can be computationally intensive and requires sufficient data length.
    mse_val = nk.entropy_multiscale(nn_intervals, show=False) 
    # mse_val is often reported as a single summary statistic (e.g., Area), 
    # or as values across scales. We'll store the Area if available.

    # except Exception as e:
    #     print(f"Error during HRV feature calculation: {e}")
    #     # Proceed with BR if possible, but HRV features will be missing
    #     hrv_indices = pd.DataFrame() # Empty dataframe
    #     mse_val = np.nan


    # --- Calculate Breathing Rate (BR) ---
    # BR estimation from PPG is complex and can be less reliable [2]. 
    # NeuroKit offers methods to estimate RSP signal from PPG.
    # try:
    # Estimate Respiratory Rate using the processed PPG signal (signals['PPG_Clean'])
    rsp_signal = nk.ppg_rsp(signals['PPG_Clean'], sampling_rate=sampling_rate)
    rsp_info = nk.rsp_process(rsp_signal, sampling_rate=sampling_rate)
    # Take the mean respiratory rate over the recording
    breathing_rate = rsp_info[0]['RSP_Rate'].mean() 
    # except Exception as e:
    #     print(f"Warning: Could not calculate Breathing Rate: {e}")
    #     breathing_rate = np.nan

    # --- Populate the features dictionary ---
    
    # Time-domain
    features['HR'] = hrv_indices['HRV_MeanHR'].iloc[0] if 'HRV_MeanHR' in hrv_indices else np.nan # Heart Rate (average) [1, 10]
    features['BR'] = breathing_rate # Breathing Rate [2]
    features['IBI'] = hrv_indices['HRV_MeanNN'].iloc[0] if 'HRV_MeanNN' in hrv_indices else np.nan # Interbeat interval (average NN) [3, 10]
    features['pNN20'] = hrv_indices['HRV_pNN20'].iloc[0] if 'HRV_pNN20' in hrv_indices else np.nan # [3]
    features['pNN50'] = hrv_indices['HRV_pNN50'].iloc[0] if 'HRV_pNN50' in hrv_indices else np.nan # [3]
    features['SDNN'] = hrv_indices['HRV_SDNN'].iloc[0] if 'HRV_SDNN' in hrv_indices else np.nan # [3]
    features['RMSSD'] = hrv_indices['HRV_RMSSD'].iloc[0] if 'HRV_RMSSD' in hrv_indices else np.nan # [3]
    features['SDSD'] = hrv_indices['HRV_SDSD'].iloc[0] if 'HRV_SDSD' in hrv_indices else np.nan # [3]
    features['NN50'] = hrv_indices['HRV_NN50'].iloc[0] if 'HRV_NN50' in hrv_indices else np.nan # NN50 count [3]
    features['HRV_triangular_index'] = hrv_indices['HRV_HTI'].iloc[0] if 'HRV_HTI' in hrv_indices else np.nan # [4]
    features['TINN'] = hrv_indices['HRV_TINN'].iloc[0] if 'HRV_TINN' in hrv_indices else np.nan # [4]

    # Frequency-domain (units ms^2) [5]
    features['VLF_power'] = hrv_indices['HRV_VLF'].iloc[0] if 'HRV_VLF' in hrv_indices else np.nan # Typically 0.0033–0.04 Hz [5]
    features['LF_power'] = hrv_indices['HRV_LF'].iloc[0] if 'HRV_LF' in hrv_indices else np.nan   # Typically 0.04–0.15 Hz [5]
    features['HF_power'] = hrv_indices['HRV_HF'].iloc[0] if 'HRV_HF' in hrv_indices else np.nan   # Typically 0.15–0.4 Hz [5]
    features['LF_HF_ratio'] = hrv_indices['HRV_LFHF'].iloc[0] if 'HRV_LFHF' in hrv_indices else np.nan # [5]
    features['Total_power'] = hrv_indices['HRV_TotalPower'].iloc[0] if 'HRV_TotalPower' in hrv_indices else np.nan # Sum of VLF, LF, HF [5]

    # Non-linear
    features['Poincare_SD1'] = hrv_indices['HRV_SD1'].iloc[0] if 'HRV_SD1' in hrv_indices else np.nan # Short-term variability [6, 11]
    features['Poincare_SD2'] = hrv_indices['HRV_SD2'].iloc[0] if 'HRV_SD2' in hrv_indices else np.nan # Long-term variability [6, 11]
    features['Sample_entropy'] = hrv_indices['HRV_SampEn'].iloc[0] if 'HRV_SampEn' in hrv_indices else np.nan # [7, 11]
    features['DFA_alpha1'] = hrv_indices['HRV_DFA_alpha1'].iloc[0] if 'HRV_DFA_alpha1' in hrv_indices else np.nan # Short-term fractal scaling exponent [8, 11]
    features['DFA_alpha2'] = hrv_indices['HRV_DFA_alpha2'].iloc[0] if 'HRV_DFA_alpha2' in hrv_indices else np.nan # Long-term fractal scaling exponent (requires longer signal) [8, 11]
    # Extracting a single value from MSE output (e.g., Area under the MSE curve)
    features['Multiscale_entropy'] = mse_val['MSE_Area'] if isinstance(mse_val, dict) and 'MSE_Area' in mse_val else np.nan # [9]
    
    # Clean up NaNs potentially introduced by failed calculations within NeuroKit
    features = {k: (None if pd.isna(v) else v) for k, v in features.items()}

    return features

# def calculate_bvp_features(signal: np.ndarray, sampling_rate: int) -> dict:
#     """
#     Calculates time-domain, frequency-domain, and non-linear features 
#     from a BVP/PPG signal, handling potential errors for short signals.

#     Args:
#         signal (np.ndarray): A 1D numpy array containing the BVP/PPG signal.
#         sampling_rate (int): The sampling rate of the signal in Hz.

#     Returns:
#         dict: A dictionary containing the calculated features. Returns features 
#               set to None/NaN if calculation fails (e.g., due to short signal).
#     """
    
#     features = {}
#     # Initialize all expected keys to None for consistent output structure
#     feature_keys = [
#         'HR', 'BR', 'IBI', 'pNN20', 'pNN50', 'SDNN', 'RMSSD', 'SDSD', 
#         'NN50', 'HRV_triangular_index', 'TINN', 'VLF_power', 'LF_power', 
#         'HF_power', 'LF_HF_ratio', 'Total_power', 'Poincare_SD1', 
#         'Poincare_SD2', 'Sample_entropy', 'DFA_alpha1', 'DFA_alpha2', 
#         'Multiscale_entropy'
#     ]
#     features = {key: None for key in feature_keys}
    
#     hrv_indices = pd.DataFrame() # Initialize empty dataframe
#     nn_intervals = np.array([])  # Initialize empty array
#     peaks = np.array([])        # Initialize empty array
    
#     # --- Basic Signal Processing and Peak Detection ---
#     try:
#         with warnings.catch_warnings(): # Suppress specific runtime warnings if desired
#             warnings.simplefilter("ignore", category=RuntimeWarning) 
#             signals, info = nk.ppg_process(signal, sampling_rate=sampling_rate)
#         peaks = info["PPG_Peaks"]

#         if len(peaks) < 5:
#             print(f"Warning: Insufficient peaks ({len(peaks)}) detected for reliable analysis.")
#             # Return dictionary with None values
#             return features 

#         # Calculate NN intervals (IBIs) in milliseconds
#         nn_intervals = np.diff(peaks) / sampling_rate * 1000 
        
#         # Check if enough NN intervals exist for basic HRV stats (at least 2 for diff)
#         if len(nn_intervals) < 2:
#              print(f"Warning: Insufficient NN intervals ({len(nn_intervals)}) for HRV analysis.")
#              # Still attempt BR, but return features dict (mostly None)
#              hrv_indices = pd.DataFrame() # Ensure it's empty
#         else:
#              # --- Calculate HRV Features using NeuroKit2 ---
#              try:
#                  # Suppress potential RuntimeWarnings from stats calculations on short series
#                  with warnings.catch_warnings():
#                      warnings.simplefilter("ignore", category=RuntimeWarning)
#                      hrv_indices = nk.hrv(peaks, sampling_rate=sampling_rate, show=False)
             
#              except ValueError as e:
#                  # Catch the specific DFA window size error
#                  if "the window cannot contain more data points" in str(e):
#                      print(f"Warning: DFA calculation failed due to short NN interval series ({len(nn_intervals)} intervals). DFA features set to None.")
#                      # Try calculating other domains separately if hrv() failed completely
#                      try:
#                          hrv_time_freq = nk.hrv_time(peaks, sampling_rate=sampling_rate)
#                          hrv_time_freq = hrv_time_freq.join(nk.hrv_frequency(peaks, sampling_rate=sampling_rate))
#                          # Attempt Poincare/SampEn if possible (less sensitive to length than DFA)
#                          hrv_nonlinear_partial = nk.hrv_nonlinear(peaks, sampling_rate=sampling_rate, D変動=False) # Assuming D変動 param exists or adapt
#                          # Manually set DFA keys to avoid KeyErrors later
#                          hrv_nonlinear_partial['HRV_DFA_alpha1'] = np.nan
#                          hrv_nonlinear_partial['HRV_DFA_alpha2'] = np.nan
#                          hrv_indices = hrv_time_freq.join(hrv_nonlinear_partial)

#                      except Exception as inner_e:
#                          print(f"Warning: Could not salvage HRV features after DFA error: {inner_e}")
#                          hrv_indices = pd.DataFrame() # Reset if partial calc fails
#                  else:
#                      # Re-raise other unexpected ValueErrors
#                      raise e
#              except Exception as e:
#                  print(f"Error during HRV feature calculation: {e}")
#                  hrv_indices = pd.DataFrame() # Reset on other errors

#     except Exception as e:
#         print(f"Error during PPG processing or peak detection: {e}")
#         # Return dictionary with None values
#         return features

#     # --- Calculate Breathing Rate (BR) ---
#     breathing_rate = None
#     try:
#         # Ensure signals were processed
#         if 'PPG_Clean' in signals:
#              # Suppress potential RuntimeWarnings during RSP processing
#              with warnings.catch_warnings():
#                  warnings.simplefilter("ignore", category=RuntimeWarning)
#                  rsp_signal = nk.ppg_rsp(signals['PPG_Clean'], sampling_rate=sampling_rate)
#                  # Need enough data for RSP processing
#                  if len(rsp_signal) > sampling_rate * 4: # Heuristic: need a few seconds
#                      rsp_info, _ = nk.rsp_process(rsp_signal, sampling_rate=sampling_rate)
#                      # Check if RSP_Rate exists and is not empty
#                      if 'RSP_Rate' in rsp_info and not rsp_info['RSP_Rate'].empty:
#                           breathing_rate = rsp_info['RSP_Rate'].mean()
#                      else:
#                           print("Warning: RSP rate could not be determined.")
#                  else:
#                      print("Warning: Signal too short for reliable RSP processing.")
#         else:
#              print("Warning: Clean PPG signal not available for BR calculation.")

#     except Exception as e:
#         print(f"Warning: Could not calculate Breathing Rate: {e}")
#         # breathing_rate remains None

#     # --- Calculate Multiscale Entropy (MSE) ---
#     mse_val_area = None
#     if len(nn_intervals) > 10: # MSE requires a minimum number of intervals
#         try:
#             # Suppress potential RuntimeWarnings
#             with warnings.catch_warnings():
#                  warnings.simplefilter("ignore", category=RuntimeWarning)
#                  mse_results = nk.entropy_multiscale(nn_intervals, show=False, dimension=2) # Specify dimension
#             # Extract the area metric if available
#             if isinstance(mse_results, dict) and 'MSE_Area' in mse_results:
#                  mse_val_area = mse_results['MSE_Area']
#             elif isinstance(mse_results, tuple) and len(mse_results) > 0 and isinstance(mse_results[0], dict) and 'MSE_Area' in mse_results[0]:
#                  mse_val_area = mse_results[0]['MSE_Area'] # Adapt if structure changes
#         except Exception as e:
#             print(f"Warning: Could not calculate Multiscale Entropy: {e}")
#     else:
#         print(f"Warning: Skipping MSE calculation due to insufficient NN intervals ({len(nn_intervals)}).")


#     # --- Populate the features dictionary safely ---
#     # Use .get() with default None or access DataFrame safely
    
#     features['HR'] = hrv_indices.get('HRV_MeanHR', pd.Series([None])).iloc[0]
#     features['BR'] = breathing_rate 
#     features['IBI'] = hrv_indices.get('HRV_MeanNN', pd.Series([None])).iloc[0]
#     features['pNN20'] = hrv_indices.get('HRV_pNN20', pd.Series([None])).iloc[0] 
#     features['pNN50'] = hrv_indices.get('HRV_pNN50', pd.Series([None])).iloc[0] 
#     features['SDNN'] = hrv_indices.get('HRV_SDNN', pd.Series([None])).iloc[0] 
#     features['RMSSD'] = hrv_indices.get('HRV_RMSSD', pd.Series([None])).iloc[0]
#     features['SDSD'] = hrv_indices.get('HRV_SDSD', pd.Series([None])).iloc[0] 
#     features['NN50'] = hrv_indices.get('HRV_NN50', pd.Series([None])).iloc[0] 
#     features['HRV_triangular_index'] = hrv_indices.get('HRV_HTI', pd.Series([None])).iloc[0]
#     features['TINN'] = hrv_indices.get('HRV_TINN', pd.Series([None])).iloc[0] 

#     features['VLF_power'] = hrv_indices.get('HRV_VLF', pd.Series([None])).iloc[0] 
#     features['LF_power'] = hrv_indices.get('HRV_LF', pd.Series([None])).iloc[0]  
#     features['HF_power'] = hrv_indices.get('HRV_HF', pd.Series([None])).iloc[0]  
#     features['LF_HF_ratio'] = hrv_indices.get('HRV_LFHF', pd.Series([None])).iloc[0] 
#     features['Total_power'] = hrv_indices.get('HRV_TotalPower', pd.Series([None])).iloc[0] 

#     features['Poincare_SD1'] = hrv_indices.get('HRV_SD1', pd.Series([None])).iloc[0] 
#     features['Poincare_SD2'] = hrv_indices.get('HRV_SD2', pd.Series([None])).iloc[0] 
#     features['Sample_entropy'] = hrv_indices.get('HRV_SampEn', pd.Series([None])).iloc[0] 
#     features['DFA_alpha1'] = hrv_indices.get('HRV_DFA_alpha1', pd.Series([None])).iloc[0] # Will be None if error was caught
#     features['DFA_alpha2'] = hrv_indices.get('HRV_DFA_alpha2', pd.Series([None])).iloc[0] # Will be None if error was caught
#     features['Multiscale_entropy'] = mse_val_area
    
#     # Final check for NaNs introduced by calculations and convert to None
#     for k, v in features.items():
#         if pd.isna(v):
#             features[k] = None
            
#     return features


  freqs, _, Pxy = _spectral_helper(x, y, fs, window, nperseg, noverlap,
ERROR:root:Error processing /home/abhaygupta/workspaceVM/capstone/rPPG-Toolbox/PreprocessedData/dummyFolder/101_label0.npy: NeuroKit error: the window cannot contain more data points than the time series. Decrease 'scale'.


Dataset created with 0 files and -1 features
