The extraction of Pulse rate from the video file for 1 video file - video id : Copy of SL133

In [None]:
# ✅ Mount Google Drive
drive.mount('/content/drive')

In [45]:
 #importing required libraries
import numpy as np
import cv2
import os
from scipy.signal import find_peaks, butter, filtfilt, welch

# ✅ Butterworth Bandpass Filter : Butterworth bandpass filter to remove noise and retain only the frequencies related to pulse rate. Here lowcut=0.7 Hz and highcut=3.0 Hz restrict the frequency range to typical human heart rate (42 - 180 BPM).butter(order, [low, high], btype='band') creates the filter.
def butter_bandpass_filter(data, lowcut=0.7, highcut=3.0, fs=30, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data)

# ✅ Extract Pulse from Video
def extract_pulse(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"⚠️ Cannot open video file: {video_path}")
        return None

    fps = cap.get(cv2.CAP_PROP_FPS)  # ✅ Extract Frames per Second (FPS)
    heart_rates = []

    while True:                     #frame processing loop
        ret, frame = cap.read()
        if not ret:
            break

        # ✅ Extract Forehead Region (Forehead region being the Region of Interest (ROI))
        roi = frame[50:150, 100:250]
        mean_intensity = np.mean(roi)  # ✅ Get mean pixel intensity which computes the average brightness of the ROI. heart_rates.append(mean_intensity): Stores the mean pixel intensity as a proxy for pulse signals.
        heart_rates.append(mean_intensity)

    cap.release()  # ✅ Release Video Capture

    # ✅ Ensure we have enough frames for processing
    if len(heart_rates) < 30:
        print("⚠️ Not enough frames for pulse extraction!")
        return None

    # ✅ Apply Bandpass Filter :Filters the intensity values to remove noise and retain only relevant heart rate signals.
    filtered_signal = butter_bandpass_filter(heart_rates, lowcut=0.7, highcut=3.0, fs=fps)

#two methods used for filtering signal and then both combined using their weighted combination.

    # ✅ Method 1: Peak Detection (Higher Threshold for True Peaks)
    peaks, _ = find_peaks(filtered_signal, distance=int(fps * 0.5), prominence=0.07 * np.std(filtered_signal))
    peak_based_pulse = (len(peaks) / (len(heart_rates) / fps)) * 60

    # ✅ Method 2: Welch’s Power Spectral Density (Better FFT)
    f_welch, Pxx_welch = welch(filtered_signal, fs=fps, nperseg=len(filtered_signal) // 2)
    valid_freqs = (f_welch > 0.7) & (f_welch < 3.0)
    if np.any(valid_freqs):
        dominant_freq = f_welch[valid_freqs][np.argmax(Pxx_welch[valid_freqs])]
        fft_based_pulse = dominant_freq * 60
    else:
        fft_based_pulse = 0  # Default if no valid frequency is found

    # ✅ Final Weighted Pulse Calculation (Increased FFT Weight)
    final_pulse_rate = (peak_based_pulse * 0.35) + (fft_based_pulse * 0.80)

    return {
        "video_id": os.path.basename(video_path).replace(".mp4", ""),
        "peak_based_pulse": round(peak_based_pulse, 2),
        "fft_based_pulse": round(fft_based_pulse, 2),
        "final_pulse_rate": round(final_pulse_rate, 2)
    }

# ✅ Test Extraction on a Sample Video
video_path = "/content/sample_data/testvideo/Copy of SL133.mp4"  # Update with actual path
result = extract_pulse(video_path)
print(result)


{'video_id': 'Copy of SL133', 'peak_based_pulse': 72.59, 'fft_based_pulse': 79.47, 'final_pulse_rate': 88.98}


The extraction of Pulse rate from multiple video files

In [None]:
#import numpy as np
#import cv2
#import os
import pandas as pd
from google.colab import drive
#from scipy.signal import find_peaks, butter, filtfilt, welch


# ✅ Define the folder containing videos (Update this path)
video_folder = "/content/sample_data"


# ✅ Butterworth Bandpass Filter
def butter_bandpass_filter(data, lowcut=0.7, highcut=3.0, fs=30, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return filtfilt(b, a, data)

# ✅ Extract Pulse from a Single Video
def extract_pulse(video_path):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"⚠️ Cannot open video file: {video_path}")
        return None

    fps = cap.get(cv2.CAP_PROP_FPS)  # ✅ Extract Frames per Second (FPS)
    heart_rates = []

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        # ✅ Extract Forehead Region (Adjust ROI if needed)
        roi = frame[50:150, 100:250]  # Example ROI
        mean_intensity = np.mean(roi)  # ✅ Get mean pixel intensity
        heart_rates.append(mean_intensity)

    cap.release()  # ✅ Release Video Capture

    # ✅ Ensure we have enough frames for processing
    if len(heart_rates) < 30:
        print(f"⚠️ Not enough frames for {video_path}!")
        return None

    # ✅ Apply Bandpass Filter
    filtered_signal = butter_bandpass_filter(heart_rates, lowcut=0.7, highcut=3.0, fs=fps)

    # ✅ Method 1: Peak Detection
    peaks, _ = find_peaks(filtered_signal, distance=int(fps * 0.5), prominence=0.07 * np.std(filtered_signal))
    peak_based_pulse = (len(peaks) / (len(heart_rates) / fps)) * 60

    # ✅ Method 2: Welch’s Power Spectral Density
    f_welch, Pxx_welch = welch(filtered_signal, fs=fps, nperseg=len(filtered_signal) // 2)
    valid_freqs = (f_welch > 0.7) & (f_welch < 3.0)
    if np.any(valid_freqs):
        dominant_freq = f_welch[valid_freqs][np.argmax(Pxx_welch[valid_freqs])]
        fft_based_pulse = dominant_freq * 60
    else:
        fft_based_pulse = 0  # Default if no valid frequency is found

    # ✅ Final Weighted Pulse Calculation
    final_pulse_rate = (peak_based_pulse * 0.35) + (fft_based_pulse * 0.80)

    return {
        "video_id": os.path.basename(video_path),
        "peak_based_pulse": round(peak_based_pulse, 2),
        "fft_based_pulse": round(fft_based_pulse, 2),
        "final_pulse_rate": round(final_pulse_rate, 2)
    }

# ✅ Process All Videos in Google Drive Folder : the loop for running the above code (same as for 1 video file )
results = []
for file_name in os.listdir(video_folder):
    if file_name.endswith(".mp4"):
        video_path = os.path.join(video_folder, file_name)
        print(f"🚀 Processing: {file_name}")
        result = extract_pulse(video_path)
        if result:
            results.append(result)

# ✅ Save Results to CSV
output_csv = "/content/drive/My Drive/pulse_results.csv"
df = pd.DataFrame(results)
df.to_csv(output_csv, index=False)
print(f"✅ Results saved to {output_csv}")

# ✅ Print Summary
print(df)


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
🚀 Processing: Copy of SL496.mp4
🚀 Processing: Copy of IN901.mp4
🚀 Processing: Copy of SL191.mp4
🚀 Processing: Copy of SL225.mp4
🚀 Processing: Copy of SL413.mp4
✅ Results saved to /content/drive/My Drive/pulse_results.csv
            video_id  peak_based_pulse  fft_based_pulse  final_pulse_rate
0  Copy of SL496.mp4             66.84            51.93             64.93
1  Copy of IN901.mp4             63.04            47.16             59.79
2  Copy of SL191.mp4             69.99            49.30             63.94
3  Copy of SL225.mp4             69.61            60.24             72.56
4  Copy of SL413.mp4             65.50            51.61             64.22


The results above are inconsistent for multiple videos but accurate for single sample as depicted above

Using the results obtained from the above measurement for training the model for prediction of pulse rate

In [56]:
import time
import cv2
import numpy as np
import joblib
import pandas as pd
import os

def extract_features_from_video(video_path):
    """Extract features from a given video."""
    start_time = time.time()

    cap = cv2.VideoCapture(video_path)
    heart_rates = []
    frame_skip = 10

    if not cap.isOpened():
        print(f"⚠️ Cannot open video file: {video_path}")
        return None

    frame_count = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        if frame_count % frame_skip == 0:
            frame = cv2.resize(frame, (640, 480))
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            h, w = gray.shape
            forehead = gray[int(0.2*h):int(0.3*h), int(0.3*w):int(0.7*w)]
            heart_rates.append(np.mean(forehead))

        frame_count += 1

    cap.release()

    if len(heart_rates) == 0:
        print("⚠️ No valid frames extracted for heart rate calculation!")
        return None

    elapsed_time = time.time() - start_time
    return {
        "video_id": os.path.basename(video_path).replace(".mp4", ""),
        "processing_time_sec": round(elapsed_time, 2)
    }

def predict_pulse_rate(video_path, model_path):
    """Predict pulse rate from a new video using a trained model."""

    if not os.path.exists(model_path):
        print(f"❌ Model file not found at: {model_path}")
        return None

    model = joblib.load(model_path)
    features = extract_features_from_video(video_path)

    if features is None:
        print("❌ Could not extract features from video.")
        return None

    feature_df = pd.DataFrame([features])[['processing_time_sec']]
    predicted_pulse = model.predict(feature_df)[0]

    print(f"🎯 Predicted Pulse Rate: {predicted_pulse:.2f} BPM")
    return predicted_pulse

# -------------------------------------------------------------------------------
# **📌 Example: Predict Pulse Rate from a New Video**
# -------------------------------------------------------------------------------
new_video = "/content/sample_data/testvideo/Copy of SL133.mp4"  # Change path to your test video
model_path = "/content/sample_data/trained_model.pkl"  # Specify the model path

predicted_pulse = predict_pulse_rate(new_video, model_path)


🎯 Predicted Pulse Rate: 73.20 BPM


