In [2]:
# !pip3 install pandas numpy opencv-python matplotlib scipy requests openpyxl cv2

In [3]:
import cv2
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import butter, filtfilt, savgol_filter
from scipy.fft import fft, fftfreq, ifft
from scipy.signal import find_peaks, argrelmin, argrelmax
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_percentage_error
import os

In [4]:
DATASET_PATH = 'fv_hb_dataset.csv'
VIDEO_PATH = 'videos/'

# Video Processing
    Block Processing
    Applying Butterworth Filter
    Applying fft
    Plotting blocks
    Extract first block average red
    Plot avgerage red values
    PPG Signal Extraction
    
    

In [5]:
def block_processing(frame_rgb, block_height, block_width):
    """
    Process each block in the frame and calculate the average red value for each block.
    """
    avg_red_values = np.zeros((10, 10), dtype=float)
    for i in range(10):
        for j in range(10):
            block = frame_rgb[i*block_height:(i+1)*block_height, j*block_width:(j+1)*block_width]
            avg_red = np.mean(block[:, :, 0])  # 0 index for the red channel
            avg_red_values[i, j] = avg_red
    return avg_red_values

def butterworth_filter(signal, sampling_rate, lowcut, highcut, order=2):
    """
    Apply Butterworth filter to the signal.
    """
    nyquist_freq = 0.5 * sampling_rate
    low = lowcut / nyquist_freq
    high = highcut / nyquist_freq
    b, a = butter(order, [low, high], btype='band', analog=False)
    filtered_signal = filtfilt(b, a, signal)
    return filtered_signal

def apply_fft(signal, sampling_rate, low_freq=0.5, high_freq=5.0):
    """
    Apply FFT to the signal.
    """
    n = len(signal)
    freq = fftfreq(n, d=1/sampling_rate)
    fft_signal = fft(signal)

    unwanted_indices = (freq < low_freq) | (freq > high_freq)
    fft_signal[unwanted_indices] = 0
    filtered_signal = ifft(fft_signal)
    return filtered_signal.real



def plot_blocks(data):
    """
    Plot PPG signal for each block.
    """
    fig, axs = plt.subplots(10, 10, figsize=(20, 20))
    for i in range(10):
        for j in range(10):
            axs[i, j].plot(data[i, j, :])
            axs[i, j].set_title(f'Block {i}, {j}')
            axs[i, j].set_xticks([])
            axs[i, j].set_yticks([])
    plt.show()


def extract_first_block_avg_red(data):
    """
    Extract the average red values for the first block of each frame.
    """
    first_block_avg_red = []
    for i in range(data.shape[2]):
        avg_red = data[0, 0, i]
        first_block_avg_red.append(avg_red)
    return first_block_avg_red


def plot_avg_red_values(data, title):
    """
    Plot average red values.
    """
    plt.figure(figsize=(15, 4))
    plt.plot(data, label=title)
    plt.xlabel('Frame')
    plt.ylabel('Average Red Value' if 'Original' in title else 'Relative Intensity')
    plt.title(title)
    plt.legend()
    plt.show()



def PPG_Signal_Extraction(video_path):
    """
    Extract PPG signal from the given video.
    """
    cap = cv2.VideoCapture(video_path)
    frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    # print(f'Frame count: {frame_count}, Width: {width}, Height: {height}, FPS: {fps}')

    block_width = width // 10
    block_height = height // 10

    avg_red_values = np.zeros((10, 10, frame_count), dtype=float)

    current_frame = 0
    while cap.isOpened() and current_frame < frame_count:
        ret, frame = cap.read()
        if not ret:
            break
        
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        avg_red_values[:, :, current_frame] = block_processing(frame_rgb, block_height, block_width)
        
        current_frame += 1

    cap.release()

    selected_frames = avg_red_values[:, :, 26:276]
    

    print("Raw PPG Signal")
    # plot_blocks(selected_frames)

    Sc = np.zeros_like(selected_frames)
    BF = np.zeros_like(selected_frames)
    for i in range(10):
        for j in range(10):
            signal = selected_frames[i, j, :]
            filtered_signal = butterworth_filter(signal, fps, 0.5, 5.0)
            BF[i, j, :] = filtered_signal
            fft_signal = apply_fft(filtered_signal, fps, 0.5, 5.0)
            Sc[i, j, :] = fft_signal

    SPPG = -1 * Sc

    print("Denoised PPG Signal after Butterworth Filter")
    # plot_blocks(BF)

    print("SPPG Signal after FFT and Butterworth Filter")
    # plot_blocks(SPPG)

    first_block_avg_red = extract_first_block_avg_red(selected_frames)
    first_block_avg_red_denoised = extract_first_block_avg_red(SPPG)

    # plot_avg_red_values(first_block_avg_red, 'Original Average Red Values for the First Block of Each Frame')
    # plot_avg_red_values(first_block_avg_red_denoised, 'Denoised Average Red Values for the First Block of Each Frame')


    return SPPG, fps

# PPG Cycle Detection

In [6]:
def get_expected_cycle_time(ppg_signal, sampling_rate):
    # Compute the FFT of the PPG signal
    fft_signal = fft(ppg_signal)
    
    # Compute the frequency spectrum and corresponding frequencies
    n = len(ppg_signal)
    freq = fftfreq(n, d=1/sampling_rate)
    
    # Find the index of the dominant frequency component
    dominant_freq_idx = np.argmax(np.abs(fft_signal))
    
    # Calculate the period of the dominant frequency component
    dominant_freq = freq[dominant_freq_idx]
    if dominant_freq == 0:
        # Avoid division by zero when the dominant frequency is 0 Hz
        return np.inf
    expected_period = 1 / dominant_freq
    
    # Convert period to cycle time (in seconds)
    expected_cycle_time = expected_period
    
    return expected_cycle_time


def detect_ppg_cycles_for_one_signal(sppg_signal, sampling_rate):
    ppg_cycles = []

    peaks, _ = find_peaks(sppg_signal)
    valleys, _ = find_peaks(-sppg_signal)

    # Ensure peaks and valleys are not empty
    if len(peaks) == 0 or len(valleys) == 0:
        return ppg_cycles

    # # Plot the PPG signal
    # plt.figure(figsize=(15,4))
    # plt.plot(sppg_signal, label='PPG Signal')

    # # Mark peaks and valleys differently
    # plt.scatter(peaks, sppg_signal[peaks], color='red', marker='o', label='Peaks')
    # plt.scatter(valleys, sppg_signal[valleys], color='blue', marker='x', label='Valleys')

    # Mark start_point, systolic_peak, dicrotic_notch, diastolic_peak, and end_point
    for peak_idx in range(len(peaks) - 1):
        for valley_idx in range(len(valleys) - 1):
            start_idx = valleys[valley_idx]
            end_idx = valleys[valley_idx + 1]

            # Check if the current peak is within the current valley
            if start_idx < peaks[peak_idx] < end_idx:
                try:
                    systolic_peak = peaks[peak_idx]
                    diastolic_peak = peaks[peak_idx + 1]
                    start_point = start_idx
                    end_point = valleys[valley_idx + 2]
                    dicrotic_notch = valleys[valley_idx+1]

                    # Check conditions for valid PPG cycle
                    if (sppg_signal[systolic_peak] > sppg_signal[diastolic_peak]):
                        if (sppg_signal[dicrotic_notch] > sppg_signal[start_point] and sppg_signal[dicrotic_notch] > sppg_signal[end_point]):

                            # Calculate time elapsed for PPG cycle
                            cycle_time = (end_point - start_point) / sampling_rate
                            expected_cycle_time = get_expected_cycle_time(sppg_signal, sampling_rate)
                            error_margin = 0.2 * expected_cycle_time
                            # Check if time elapsed is within threshold
                            if abs(cycle_time - expected_cycle_time) <= error_margin:
                                
                                # # Mark each systolic peak and diastolic peak with a different marker
                                # plt.scatter(systolic_peak, sppg_signal[systolic_peak], color='green', marker='^')
                                # plt.scatter(diastolic_peak, sppg_signal[diastolic_peak], color='pink', marker='d')
                                # # Mark each dicrotic notch with a different marker
                                # plt.scatter(dicrotic_notch, sppg_signal[dicrotic_notch], color='purple', marker='*')
                                ppg_cycles.append((start_point, systolic_peak, dicrotic_notch, diastolic_peak, end_point))
                                # # Mark starting and ending point with a different marker
                                # plt.scatter(start_point, sppg_signal[start_point], color='black', marker='P')
                                # plt.scatter(end_point, sppg_signal[end_point], color='orange', marker='s')
                except IndexError:
                    pass
    # # Plot the legend
    # plt.legend()
    # plt.legend(['PPG Signal', 'Systolic Peak', 'Diastolic Peak', 'Dicrotic Notch', 'Start Point', 'End Point'])
    # plt.xlabel('Frame')
    # plt.ylabel('PPG Signal Value')
    # plt.title('Detected Cycles')
    # plt.show()

    return ppg_cycles

### 
    Cycle selection

In [7]:
def select_three_ppg_cycles(ppg_cycles):
    # Sort the detected PPG cycles based on descending systolic heights
    sorted_cycles = sorted(ppg_cycles, key=lambda cycle: cycle[1], reverse=True)
    # print("Sorted PPG cycles:", sorted_cycles)

    # If at least three PPG cycles are detected, select the top three based on systolic heights
    if len(sorted_cycles) >= 3:
        selected_cycles = sorted_cycles[:3]
    elif len(sorted_cycles) > 0:
        # If less than three PPG cycles are detected, replicate the PPG cycle with maximum systolic height to get three cycles
        selected_cycles = sorted_cycles
        while len(selected_cycles) < 3:
            selected_cycles.append(sorted_cycles[0])
    else:
        # If no PPG cycles are detected, return an empty list
        selected_cycles = []    
    
    return selected_cycles


### 
    Cycle merging

In [8]:
def merge_ppg_cycles(sppg_signal, selected_cycles):
    if len(selected_cycles) == 0:
        return None
    
    max_length = max(end_idx - start_idx + 1 for start_idx, _, _, _, end_idx in selected_cycles)
    merged_signal = np.zeros(max_length)
    num_cycles = len(selected_cycles)
    
    for cycle in selected_cycles:
        start_idx, systolic_peak_idx, dicrotic_notch_idx, diastolic_peak_idx, end_idx = cycle
        cycle_signal = sppg_signal[start_idx:end_idx+1]
        
        # Pad cycle signal if necessary
        if len(cycle_signal) < max_length:
            pad_length = max_length - len(cycle_signal)
            cycle_signal = np.pad(cycle_signal, (0, pad_length), mode='constant')
        
        merged_signal += cycle_signal
    
    # Take the average of the summed signals
    merged_signal /= num_cycles
    
    return merged_signal

### 
    Visualizing Merged Cycle

In [9]:
def visualize_merged_signal(merged_signal, sampling_rate):
    if merged_signal is None:
        print("No merged signal to visualize.")
        return
    
    # Calculate time axis based on sampling rate
    time_axis = np.arange(len(merged_signal)) / sampling_rate
    
    # Plot the merged signal
    plt.figure()
    plt.plot(time_axis, merged_signal, color='blue')
    plt.xlabel('Time (seconds)')
    plt.ylabel('PPG Signal Value')
    plt.title('Merged PPG Signal')
    plt.grid(True)
    plt.show()

In [10]:
def PPG_Cycle_Detection_and_Merging(SPPG, sampling_rate):
    """
    Detect PPG cycles for each block, select three cycles, and merge them into a single signal for each block.
    """
    all_ppg_cycles = []
    all_selected_cycles = []
    all_merged_signals = []

    for i in range(10):
        for j in range(10):
            sppg_signal = SPPG[i, j, :]
            ppg_cycles = detect_ppg_cycles_for_one_signal(sppg_signal, sampling_rate)
            all_ppg_cycles.extend(ppg_cycles)
            # print(f"Detected PPG cycles for block {i}, {j}:", ppg_cycles)

            selected_cycles = select_three_ppg_cycles(ppg_cycles)
            all_selected_cycles.extend(selected_cycles)
            # print(f"Selected PPG cycles {i}, {j}:", selected_cycles)

            merged_signal = merge_ppg_cycles(sppg_signal, selected_cycles)
            # print("Merged PPG signal:", merged_signal)
             # visualize_merged_signal(merged_signal, sampling_rate)
            all_merged_signals.append(merged_signal)

    return all_merged_signals

# Feature Extraction

In [11]:
def extract_features(merged_signal, sampling_rate):
    features = {}
    
    peaks = argrelmax(np.array(merged_signal))[0]
    valleys = argrelmin(np.array(merged_signal))[0]

    
    # Ensure peaks and valleys are not empty
    if len(peaks) <=1 or len(valleys) == 0:
        return features
    
    
    def __next_pow2(x):
        return 1<<(x-1).bit_length()
   
    # Calculate first derivative
    derivative_1 = np.diff(merged_signal, n=1) * (sampling_rate)
    derivative_1_peaks = argrelmax(np.array(derivative_1))[0]
    derivative_1_valleys = argrelmin(np.array(derivative_1))[0]
    
    # Calculate second derivative
    derivative_2 = np.diff(merged_signal, n=2) * (sampling_rate)
    derivative_2_peaks = argrelmax(np.array(derivative_2))[0]
    derivative_2_valleys = argrelmin(np.array(derivative_2))[0]
    
    sp_mag = np.abs(np.fft.fft(merged_signal, n=__next_pow2(len(merged_signal))*16))
    freqs = np.fft.fftfreq(len(sp_mag))
    sp_mag_peaks = argrelmax(sp_mag)[0]
    
    # print (derivative_1_peaks)
    # print (derivative_1_valleys)
    
    systolic_peak_height = merged_signal[peaks[0]]
    features['systolic_peak_height'] = systolic_peak_height

    diastolic_peak_height = merged_signal[peaks[1]]
    features['diastolic_peak_height'] = diastolic_peak_height
    
    dicrotic_notch_height =  merged_signal[valleys[0]]
    features['dicrotic_notch_height'] = dicrotic_notch_height

    # Calculate pulse interval
    pulse_interval = len(merged_signal) / sampling_rate
    features['pulse_interval'] = pulse_interval

    # Calculate augmentation index
    augmentation_index = diastolic_peak_height / systolic_peak_height
    features['augmentation_index'] = augmentation_index
    
    # Calculate relative augmentation index
    relative_augmentation_index = (systolic_peak_height - diastolic_peak_height) / systolic_peak_height
    features['relative_augmentation_index'] = relative_augmentation_index
    
    # Calculate ratio of z and x
    ratio_z_x = dicrotic_notch_height / systolic_peak_height
    features['ratio_z_x'] = ratio_z_x

    # Calculate negative relative augmentation index
    negative_relative_augmentation_index = (diastolic_peak_height - dicrotic_notch_height) / systolic_peak_height
    features['negative_relative_augmentation_index'] = negative_relative_augmentation_index

    # Calculate systolic peak time
    systolic_peak_time = (peaks[0]+1) / sampling_rate
    features['systolic_peak_time'] = systolic_peak_time
    
    # Calculate dicrotic notch time
    dicrotic_notch_time = (valleys[0]+1) / sampling_rate
    features['dicrotic_notch_time'] = dicrotic_notch_time
    
    # Calculate diastolic peak time
    diastolic_peak_time = (peaks[1]+1) / sampling_rate
    features['diastolic_peak_time'] = diastolic_peak_time
    
    # Calculate time between systolic and diastolic peaks
    time_between_peaks = diastolic_peak_time - systolic_peak_time
    features['time_between_peaks'] = time_between_peaks
    

    
    # Calculate time between half systolic peak points
    half_systolic_peak_points = max(merged_signal) / 2
    width = 0
    for value in merged_signal[peaks[0]::-1]:
        if value >= half_systolic_peak_points:
            width += 1
        else:
            break
    for value in merged_signal[peaks[0]+1:]:
        if value >= half_systolic_peak_points:
            width += 1
        else:
            break
        
    time_between_half_systolic_peak_points= width / sampling_rate
    features['time_between_half_systolic_peak_points']=time_between_half_systolic_peak_points
    
  
    # Inflection point area ratio
    inflection_point_area_ratio=sum(merged_signal[:peaks[0]]) / sum(merged_signal[peaks[0]:])
    features['inflection_point_area_ratio'] = inflection_point_area_ratio
    
    # Systolic peak rising slope
    systolic_peak_rising_slope=(systolic_peak_time / systolic_peak_height)
    features['systolic_peak_rising_slope']=systolic_peak_rising_slope
    
    # Diastolic peak falling slope
    diastolic_peak_falling_slope=(diastolic_peak_height / pulse_interval-diastolic_peak_time)
    features['diastolic_peak_falling_slope']=diastolic_peak_falling_slope
    
    # Ratio of t1 and pulse interval time (tpi)
    t1_tpi_ratio = systolic_peak_time / pulse_interval
    features['t1_tpi_ratio'] = t1_tpi_ratio
    
    # Ratio of t2 and pulse interval time (tpi)
    t2_tpi_ratio = dicrotic_notch_time/ pulse_interval
    features['t2_tpi_ratio'] = t2_tpi_ratio
    
    # Ratio of t3 and pulse interval time (tpi)
    t3_tpi_ratio = diastolic_peak_time / pulse_interval
    features['t3_tpi_ratio'] = t3_tpi_ratio
    
    # Ratio of deltaT and pulse interval time (tpi)
    deltaT_tpi_ratio = time_between_peaks / pulse_interval
    features['deltaT_tpi_ratio'] = deltaT_tpi_ratio
    
    # Interval time from first PPG cycle start point (l1) in first derivative of PPF (Sf) to first maxima (a1) of Sf 
    # t_a1
    interval_time_from_l1_to_a1 = (derivative_1_peaks[0]) / (sampling_rate)
    features['interval_time_from_l1_to_a1'] = interval_time_from_l1_to_a1
    
    # Interval time from point (l1) to first minima of first PPG cycle (b1) in the Sf
    # t_b1
    interval_time_from_l1_to_b1 = (derivative_1_valleys[0]) / (sampling_rate)
    features['interval_time_from_l1_to_b1'] = interval_time_from_l1_to_b1
    
    # Interval time from point (l1) to second maxima of the first PPG cycle (e1) in the Sf
    # t_e1
    interval_time_from_l1_to_e1 = (derivative_1_peaks[1]) / (sampling_rate)
    features['interval_time_from_l1_to_e1']=interval_time_from_l1_to_e1
    
    # Interval time from point (l1) to second minima of the first PPG cycle (f1) in the Sf
    # t_f1
    if len(derivative_1_valleys) >= 2:
        interval_time_from_l1_to_f1 = (derivative_1_valleys[1]) / (sampling_rate)
        features['interval_time_from_l1_to_f1'] = interval_time_from_l1_to_f1
    else:
        # Handle the case when derivative_1_valleys does not have enough elements
        interval_time_from_l1_to_f1 = 0
        features['interval_time_from_l1_to_f1'] = interval_time_from_l1_to_f1

    # Ratio of first minima (b2) and first maxima (a2) in the second derivative of PPG signal (Sf2)
    # b2/a2
    a_2 = derivative_2[derivative_2_peaks[0]]
    b_2 = derivative_2[derivative_2_valleys[0]]
    features['b2_a2_ratio'] = b_2 / a_2	
    
    # Ratio of second maxima (e2) in Sf2 and a2
    # e2/a2
    if len(derivative_2_peaks) >= 2:
        e_2 = derivative_2[derivative_2_peaks[1]]
        features['e2_a2_ratio'] = e_2 / a_2
    else:
        # Handle the case when derivative_2_peaks does not have enough elements
        e_2 = 0
        features['e2_a2_ratio'] = e_2
        
    # Ratio of (b2+e2) and a2
    if b_2 is not None and e_2 is not None:
        b2_e2_a2_ratio = (b_2 + e_2) / a_2
        features['b2_e2_a2_ratio'] = b2_e2_a2_ratio 
    else:
        b2_e2_a2_ratio = 0 # Or any other appropriate value
        features['b2_e2_a2_ratio'] = b2_e2_a2_ratio

    # Interval time from the second PPG cycle start point (l2) in second derivative of PPG to a2,
    interval_time_from_l2_to_a2=(derivative_2_peaks[0]) / (sampling_rate)
    features['interval_time_from_l2_to_a2'] = interval_time_from_l2_to_a2
    
    # Interval time from point l2 ti b2
    interval_time_from_l2_to_b2=(derivative_2_valleys[0]) / (sampling_rate)
    features['interval_time_from_l2_to_b2'] = interval_time_from_l2_to_b2
    
    
    # Ratio of ta1 and tpi
    ta1_tpi_ratio = interval_time_from_l1_to_a1 / pulse_interval
    features['ta1_tpi_ratio'] = ta1_tpi_ratio
    
      # Ratio of tb1 and tpi
    tb1_tpi_ratio = interval_time_from_l1_to_b1 / pulse_interval
    features['tb1_tpi_ratio'] = tb1_tpi_ratio
    
    
    # Ratio of te1 and tpi
    te1_tpi_ratio = interval_time_from_l1_to_e1 / pulse_interval
    features['te1_tpi_ratio'] = te1_tpi_ratio
    
    # Ratio of time interval of l1 (tl1) and tpi
    # tf1/tpi
    if interval_time_from_l1_to_f1 is not None:
        tf1_tpi_ratio = interval_time_from_l1_to_f1 / pulse_interval
        features['tf1_tpi_ratio'] = tf1_tpi_ratio
    else:
        # Handle the case when interval_time_from_l1_to_f1 is None
        tf1_tpi_ratio = 0
        features['tf1_tpi_ratio'] = tf1_tpi_ratio
        
    #Ratio of ta2 and tpi
    ta2_tpi_ratio = interval_time_from_l2_to_a2 / pulse_interval
    features['ta2_tpi_ratio'] = ta2_tpi_ratio
    
    #Ratio of tb2 and tpi
    tb2_tpi_ratio = interval_time_from_l2_to_b2 / pulse_interval
    features['tb2_tpi_ratio'] = tb2_tpi_ratio
    
    # Ratio of ta1+ta2 and pulse interval (tpi), ta1+ta2/tpi
    ta1_ta1_tpi_ratio = (interval_time_from_l1_to_a1 + interval_time_from_l2_to_a2) / pulse_interval
    features['ta1_ta1_tpi_ratio'] = ta1_ta1_tpi_ratio
    
    # Ratio of (tb1+tb2) and pulse interval (tpi)
    tb1_tb2_tpi_ratio = (interval_time_from_l1_to_b1 + interval_time_from_l2_to_b2) / pulse_interval
    features['tb1_tb2_tpi_ratio'] = tb1_tb2_tpi_ratio
    
    # Ratio of (te1+t2) and pulse interval (tpi)
    te1_te2_tpi_ratio = (interval_time_from_l1_to_e1 + dicrotic_notch_time) / pulse_interval
    features['te1_te2_tpi_ratio'] = te1_te2_tpi_ratio
    
    # Ratio of tl1+t3 and pulse interval (tpi)
    if interval_time_from_l1_to_f1 is not None:
        tf1_t3_tpi_ratio = (interval_time_from_l1_to_f1 + diastolic_peak_time) / pulse_interval
        features['tf1_t3_tpi_ratio'] = tf1_t3_tpi_ratio
    else:
        # Handle the case when interval_time_from_l1_to_f1 is None
        tf1_t3_tpi_ratio = 0
        features['tf1_t3_tpi_ratio'] = tf1_t3_tpi_ratio
        
        
    # Fundamental component frequency obtained from Fast Fourier Transformation (FFT)
    f_base = freqs[sp_mag_peaks[0]] * sampling_rate
    features['f_base'] = f_base
    
    # Fundamental component magnitude from FFT, |sbase|
    sp_mag_base = sp_mag[sp_mag_peaks[0]] / len(merged_signal)
    features['sp_mag_base'] = sp_mag_base
    
    # Second component frequency obtained from FFT. Such that, fbase<f2nd
    if len(sp_mag_peaks) >= 2:
        f_second = freqs[sp_mag_peaks[1]] * sampling_rate
        features['f_second'] = f_second
    else:
        # Handle the case when sp_mag_peaks does not have enough elements
        f_second = 0
        features['f_second'] = f_second


    # Second component magnitude from FFT
    if len(sp_mag_peaks) >= 2:
        sp_mag_second = sp_mag[sp_mag_peaks[1]] / len(merged_signal)
        features['sp_mag_second'] = sp_mag_second
    else:
        # Handle the case when sp_mag_peaks does not have enough elements
        sp_mag_second = 0
        features['sp_mag_second'] = sp_mag_second
        
        
    # Third component frequency obtained from FFT. Such that, fbase<f2nd<f3rd
    if len(sp_mag_peaks) >= 3:
        f_third = freqs[sp_mag_peaks[2]] * sampling_rate
        features['f_third'] = f_third
    else:
        # Handle the case when sp_mag_peaks does not have enough elements
        f_third = None
        features['f_third'] = f_third
    
    
    # Third component magnitude acquired from FFT
    sp_mag_third = sp_mag[sp_mag_peaks[2]] / len(merged_signal)
    features['sp_mag_third'] = sp_mag_third
        
    
    return features

In [12]:
def Feature_Extraction(all_merged_signals, sampling_rate):
    """
    Extract features from merged signals and return a DataFrame.
    """
    # Remove None values
    filtered_array = [cycle for cycle in all_merged_signals if cycle is not None]
    all_merged_signals = filtered_array

    all_extracted_features = []
    for merged_signal in all_merged_signals:
        extracted_features = extract_features(merged_signal, sampling_rate)
        all_extracted_features.append(extracted_features)

    # Filter out empty dictionaries
    non_empty_dicts = [d for d in all_extracted_features if d]

    # Convert list of dictionaries to DataFrame
    df = pd.DataFrame(non_empty_dicts)

    return df

In [13]:
def calculate_interval_bins(df):
    """
    Calculate interval bins for each column based on minimum and maximum values.
    """
    min_max_values = df.agg(['min', 'max'])
    interval_bins = {col: pd.cut(df[col], bins=np.linspace(min_val, max_val, num=11), include_lowest=True).cat.categories for col, (min_val, max_val) in min_max_values.items()}
    return interval_bins

def bin_data(df, interval_bins):
    """
    Bin data based on interval bins.
    """
    df_binned = pd.DataFrame({col: pd.cut(df[col], bins=interval_bins[col]) for col in df.columns})
    return df_binned

def extract_adjacent_bins(interval_bins, largest_bin):
    """
    Extract adjacent bins for the largest bin.
    """
    largest_interval_left = largest_bin.left
    largest_interval_right = largest_bin.right
    adjacent_right = None
    adjacent_left = None
    for interval in interval_bins:
        interval_left = interval.left
        interval_right = interval.right
        if interval_left == largest_interval_right:
            adjacent_right = interval
        if interval_right == largest_interval_left:
            adjacent_left = interval
    return adjacent_left, adjacent_right

def calculate_averaged_value(largest_bin_values, adjacent_left_values, adjacent_right_values, total_count, col):
    """
    Calculate the averaged value.
    """
    largest_bin_sum = largest_bin_values[col].sum()
    adjacent_left_sum = adjacent_left_values[col].sum() if not adjacent_left_values.empty else 0
    adjacent_right_sum = adjacent_right_values[col].sum() if not adjacent_right_values.empty else 0
    averaged_value = (largest_bin_sum + adjacent_left_sum + adjacent_right_sum) / total_count if total_count != 0 else 0
    return averaged_value

def plot_histogram(df):
    """
    Plot histogram for each feature.
    """
    for col in df.columns:
        plt.figure()
        plt.hist(df[col], bins=10, edgecolor='black')
        plt.title(col)
        plt.xlabel('Value')
        plt.ylabel('Frequency')
        plt.show()

In [14]:
def Feature_Selection_and_Vector_Generation(df):
    """
    Perform feature selection and generate a feature vector.
    """
    # Determine interval bins
    interval_bins = calculate_interval_bins(df)

    # Bin data
    df_binned = bin_data(df, interval_bins)

    # Plot histogram
    # plot_histogram(df)

    f_cap_vec = {}

    for col in df.columns:
        # Get the counts of values in each bin
        bin_counts = df_binned[col].value_counts()

        # Find the largest bin
        largest_bin = bin_counts.idxmax()

        # Extract adjacent bins
        adjacent_left, adjacent_right = extract_adjacent_bins(interval_bins[col], largest_bin)

        # Get the indices of the largest bin and its adjacent bins
        largest_bin_idx = bin_counts.index.get_loc(largest_bin)
        left_adjacent_bin_idx = bin_counts.index.get_loc(adjacent_left) if adjacent_left is not None else None
        right_adjacent_bin_idx = bin_counts.index.get_loc(adjacent_right) if adjacent_right is not None else None

        # Extract the bins and their counts
        largest_bin_count = bin_counts.iloc[largest_bin_idx]
        left_adjacent_bin_count = bin_counts.iloc[left_adjacent_bin_idx] if left_adjacent_bin_idx is not None else 0
        right_adjacent_bin_count = bin_counts.iloc[right_adjacent_bin_idx] if right_adjacent_bin_idx is not None else 0
        # print("largest_bin_count:", largest_bin_count)
        # print("left_adjacent_bin_count:", left_adjacent_bin_count)
        # print("right_adjacent_bin_count:", right_adjacent_bin_count)
        total_count = largest_bin_count + left_adjacent_bin_count + right_adjacent_bin_count

        # Extract values from the largest bin and adjacent bins
        largest_bin_values = df[df[col].apply(lambda x: x in largest_bin)]
        adjacent_left_values = df[df[col].apply(lambda x: x in adjacent_left)] if adjacent_left is not None else pd.DataFrame()
        adjacent_right_values = df[df[col].apply(lambda x: x in adjacent_right)] if adjacent_right is not None else pd.DataFrame()

        # print("adjacent_left_values:", adjacent_left_values)
        # print("adjacent_right_values:", adjacent_right_values)

        # Calculate the averaged value
        averaged_value = calculate_averaged_value(largest_bin_values, adjacent_left_values, adjacent_right_values, total_count, col)

        # Update the final feature vector f^ for the subject
        f_cap_vec[col] = averaged_value

    return f_cap_vec

In [15]:
def process_videos(folder_path, markup_csv_path):
    """
    Process videos in the folder and match them with the corresponding HB values.
    """

    # List all files in the videos folder
    video_files = os.listdir(VIDEO_PATH)
    video_names=[]
    for video in video_files:
        video_names.append(video)
        

    # Load markup CSV
    markup_df = pd.read_csv(markup_csv_path)
    dataset = markup_df[markup_df["FileName"].isin(video_names)]
    dataset = dataset.reset_index(drop=True)


    # Iterate through videos in the folder
    Final_Feature_Matrix = []
    for video_file in os.listdir(folder_path):
        if video_file.endswith((".mp4", ".avi", ".mkv", ".mov")):
            video_path = os.path.join(folder_path, video_file)

    

            # Extract PPG signal from the given video
            SPPG, sampling_rate = PPG_Signal_Extraction(video_path)

            # Detect PPG cycles for each block, select three cycles, and merge them into a single sign for each block
            all_merged_signals = PPG_Cycle_Detection_and_Merging(SPPG, sampling_rate)

            # Extract features from merged signals and return a DataFrame
            Feature_matrix = Feature_Extraction(all_merged_signals, sampling_rate)

            # Perform feature selection and generate a feature vector
            final_feature_vector = Feature_Selection_and_Vector_Generation(Feature_matrix)
            # print("Final feature vector:")
            # print(final_feature_vector)

            # Match with corresponding HB value from markup CSV
            markup_row = dataset[dataset['FileName'] == video_file]
            if not markup_row.empty:
                hb_value = markup_row.iloc[0]['HB']
                final_feature_vector['HB'] = hb_value
                Final_Feature_Matrix.append((final_feature_vector))
    
    return Final_Feature_Matrix

Final_Feature_Matrix = process_videos(VIDEO_PATH, DATASET_PATH)

Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Filter
SPPG Signal after FFT and Butterworth Filter
Raw PPG Signal
Denoised PPG Signal after Butterworth Fi

In [20]:
Final_Feature_Matrix=pd.DataFrame(Final_Feature_Matrix)
Final_Feature_Matrix

Unnamed: 0,systolic_peak_height,diastolic_peak_height,dicrotic_notch_height,pulse_interval,augmentation_index,relative_augmentation_index,ratio_z_x,negative_relative_augmentation_index,systolic_peak_time,dicrotic_notch_time,...,tb1_tb2_tpi_ratio,te1_te2_tpi_ratio,tf1_t3_tpi_ratio,f_base,sp_mag_base,f_second,sp_mag_second,f_third,sp_mag_third,HB
0,0.035877,-0.00238,-0.01382,0.800617,0.285888,0.714112,-0.124645,0.395572,0.246995,0.384127,...,0.509012,0.915287,1.156546,1.269531,0.020138,2.899038,0.01678,4.349459,0.007699,14.4
1,0.259791,0.066996,0.011892,0.88,0.220943,0.354818,-0.0062,0.108756,0.225556,0.478125,...,0.200063,0.802149,1.235286,1.077474,0.131842,2.428194,0.047822,4.098899,0.032002,12.3
2,0.19852,0.092015,0.001214,0.795935,0.304948,0.66679,0.059187,0.092954,0.223611,0.37029,...,0.533699,0.961676,1.235809,1.394265,0.076093,3.132722,0.045012,4.660247,0.032743,11.4
3,0.30844,0.219233,-0.073032,1.061111,0.257129,0.742871,0.158025,0.124214,0.223333,0.342857,...,0.233878,0.892314,1.109273,0.90554,0.209671,2.136009,0.04698,3.381696,0.0196,11.5
4,0.375177,-0.031959,-0.105839,0.655399,-0.10125,1.10125,-0.211145,0.123289,0.227778,0.396923,...,0.629773,1.209019,1.507031,1.773897,0.26308,3.661672,0.082123,5.557901,0.066112,10.7
5,0.732732,0.264411,0.001338,0.692063,0.478938,0.521062,0.097922,0.105896,0.245513,0.343478,...,0.501144,0.953203,1.268024,1.370675,0.312333,3.34517,0.219745,4.923177,0.059541,10.6
6,0.948609,-0.01993,-0.061477,1.01875,0.416614,0.583386,-0.178632,0.082003,0.24023,0.516667,...,0.432475,1.115175,1.368069,1.055961,0.294859,2.564453,0.084862,4.177734,0.031266,12.0
7,0.360602,0.164158,-0.053781,0.6125,0.523026,0.476974,-0.428153,0.208102,0.250575,0.596667,...,0.371132,0.867319,1.123964,0.704848,0.150883,1.954102,0.149089,3.013393,0.068687,10.3
8,0.948192,0.695233,0.533144,0.950617,0.729899,0.270101,0.650178,0.121391,0.264368,0.389394,...,0.35045,0.728541,1.315293,0.708008,0.291671,2.4273,0.146984,3.571777,0.132832,13.9
9,0.306499,0.082217,-0.027719,0.636486,0.08763,0.91237,-0.030655,0.129185,0.228947,0.394527,...,0.660246,1.201452,1.516219,1.766968,0.101081,3.632812,0.05924,5.735779,0.026744,12.6


# SVR Model Generation

In [23]:


# Step 2: Define Features and Target Variable
X = Final_Feature_Matrix # Features
y = Final_Feature_Matrix['HB']  # Target variable

# Step 3: Split the Data into Training and Testing Sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Step 4: Train the SVR Model
svr = SVR(kernel='rbf')  # You can choose different kernels based on your data
svr.fit(X_train, y_train)

# Step 5: Predict on Testing Data
y_pred = svr.predict(X_test)

# Step 6: Evaluate the Model
mse = mean_squared_error(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)

# Print evaluation metrics
print("Mean Squared Error:", mse)
print("Mean Absolute Percentage Error:", mape)
print("R^2 Score:", r2)


Mean Squared Error: 2.77007354566967
Mean Absolute Percentage Error: 0.11245343739637419
R^2 Score: -3.3282399151088615
