### rPPG detector (GRGB)

In [1]:
import cv2
import mediapipe as mp
import numpy as np
import csv
import time
import matplotlib.pyplot as plt
from itertools import zip_longest
import re
import os

# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False,
    max_num_faces=1,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5
)

def extract_filename(file_path):
    # filename pattern (e.g., "002_006")
    match = re.search(r'([^/]+)\.mp4$', file_path)
    if match:
        return match.group(1)
    else:
        return None

def get_videos_info(root_folder):
    video_info_list = []
    for root, dirs, files in os.walk(root_folder):
        for file in files:
            if file.endswith(('.mp4', '.mkv', '.avi', '.mov')):
                file_path = os.path.join(root, file)
                video_info_list.append(file_path)
    return video_info_list

def save_rppg_signal(signal, length, dataset, compression, amount_videos, category, filename):
        with open(f'2_rPPG/{dataset}_{compression}_{str(amount_videos)}_{length}/{category}/{filename}.csv', mode='w', newline='') as file:
            writer = csv.writer(file)
            writer.writerow(signal)


'''
USAGE:

• Put the desired video dataset into the 1_video_input folder
• Specify the parameters below
• create_folders will build all necessary directories

'''

dataset = 'faceshifter'
compression = 'raw'
amount_videos = 200 #len(get_videos_info('1_video_input/'))
category = 'original' #'original' #'manipulated'
loop_video = False 

create_folders = False

# Create necessary folders if they don't exist
if create_folders:
    subfolders = ['','/original','/manipulated','/stats']
    for i in subfolders:
        # os.makedirs(f'2_rPPG/{dataset}_{compression}_{str(amount_videos)}_5s'+i, exist_ok=True)
        # os.makedirs(f'2_rPPG/{dataset}_{compression}_{str(amount_videos)}_10s'+i, exist_ok=True)
        # os.makedirs(f'2_rPPG/{dataset}_{compression}_{str(amount_videos)}_20s'+i, exist_ok=True)
        os.makedirs(f'2_rPPG/{dataset}_{compression}_{str(amount_videos)}_inf'+i, exist_ok=True)

I0000 00:00:1726160194.250971 29658749 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 88), renderer: Apple M1


In [2]:
'''
Program to extract specific ROIs from the video frame. 
Compute the average values of color channels for the selected ROIs.
Mediapipe library is used to detect the face landmarks
'''

#ROI loction points according to the paper by F.Haugg et al. (added additional points in each case)
landmark_head = [107, 66, 69, 67, 109, 10, 338, 297, 299, 296, 336, 9]
landmark_lcheek = [118, 119, 120, 47, 126, 209, 49, 129, 203, 205, 50]
landmark_rcheek = [347, 348, 349, 277, 355, 429, 279, 358, 423, 425, 280]

video_list = get_videos_info('1_video_input/')

for i in video_list:
    filename = extract_filename(i)
    cap = cv2.VideoCapture(i)

    # Get the frames per second (fps) of the video
    fps = cap.get(cv2.CAP_PROP_FPS)
    # Get the total number of frames in the video
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    # Limit frames to specified maximal duration in seconds
    max_frames = total_frames
    #max_frames = max_duration * fps  # Number of frames
    if not cap.isOpened():
        print("Error: Could not open video file.")
        exit()

    #Initialise the list of color channels
    avg_b = []
    avg_g = []
    avg_r = []

    gr_signal = []
    gb_signal = []
    grgb_signal = []

    frame_count = 0  # Initialize frame counter
    while frame_count < max_frames:
        if cv2.waitKey(1) & 0xFF == ord('q'):
            cap.release()
            cv2.destroyAllWindows()
            cv2.waitKey(1)
            break

        ret, frame = cap.read()
        if not ret:
            if loop_video:
                cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
                ret, frame = cap.read()  # Reads a new frame
            else:  # Ends video playback
                cap.release()
                cv2.destroyAllWindows()
                cv2.waitKey(1)
                break

        # Convert the frame to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Process the frame with MediaPipe Face Mesh
        results = face_mesh.process(rgb_frame)

        # Extract the values of specific ROI with respect to landmark position points
        if results.multi_face_landmarks:
            for face_landmarks in results.multi_face_landmarks:
                # Extracting specific landmark coordinates
                h, w, _ = frame.shape
                landmark_coords_head = []
                landmark_coords_lcheek = []
                landmark_coords_rcheek = []

                for i, j, k in zip_longest(landmark_head, landmark_lcheek, landmark_rcheek, fillvalue = None):

                    if i is not None:
                        landmark_h = face_landmarks.landmark[i]
                        x = int(landmark_h.x * w)
                        y = int(landmark_h.y * h)
                        landmark_coords_head.append((x, y))
                        #cv2.circle(frame, (x, y), radius=1, color=(0, 0, 0), thickness=1)
                    
                    if j is not None:

                        landmark_lc = face_landmarks.landmark[j]
                        x1 = int(landmark_lc.x * w)
                        y1 = int(landmark_lc.y * h)
                        landmark_coords_lcheek.append((x1, y1))
                        #cv2.circle(frame, (x1, y1), radius=1, color=(0, 0, 0), thickness=1)
                    
                    if k is not None:

                        landmark_rc = face_landmarks.landmark[k]
                        x2 = int(landmark_rc.x * w)
                        y2 = int(landmark_rc.y * h)
                        landmark_coords_rcheek.append((x2, y2))
                        #cv2.circle(frame, (x2, y2), radius=1, color=(0, 0, 0), thickness=1)
                    
                # Determine the bounding box around the specified landmarks -
                # Forehead
                x_coords, y_coords = zip(*landmark_coords_head)
                x_min, x_max = max(0, min(x_coords)), min(w, max(x_coords))
                y_min, y_max = max(0, min(y_coords)), min(h, max(y_coords))
                # ROI
                forehead_roi = frame[y_min:y_max, x_min:x_max]
                forehead_blue = np.mean(forehead_roi[:, :, 0])
                forehead_green = np.mean(forehead_roi[:, :, 1])
                forehead_red = np.mean(forehead_roi[:, :, 2])

                #lcheek
                x_coords, y_coords = zip(*landmark_coords_lcheek)
                x_min, x_max = max(0, min(x_coords)), min(w, max(x_coords))
                y_min, y_max = max(0, min(y_coords)), min(h, max(y_coords))
                # ROI
                lcheek_roi = frame[y_min:y_max, x_min:x_max]
                lcheek_blue = np.mean(lcheek_roi[:, :, 0])
                lcheek_green = np.mean(lcheek_roi[:, :, 1])
                lcheek_red = np.mean(lcheek_roi[:, :, 2])

                #rcheek
                x_coords, y_coords = zip(*landmark_coords_rcheek)
                x_min, x_max = max(0, min(x_coords)), min(w, max(x_coords))
                y_min, y_max = max(0, min(y_coords)), min(h, max(y_coords))
                # ROI
                rcheek_roi = frame[y_min:y_max, x_min:x_max]
                rcheek_blue = np.mean(rcheek_roi[:, :, 0])
                rcheek_green = np.mean(rcheek_roi[:, :, 1])
                rcheek_red = np.mean(rcheek_roi[:, :, 2])

                # Crop the image
                #cropped_image = frame[y_min:y_max, x_min:x_max]
                #average = np.mean(forehead_roi[:, :, 0], lcheek_roi[:, :, 0], rcheek_roi[:, :, 0])
                #print(forehead_red, lcheek_red, rcheek_red)
                
                avg_r.append(np.mean([forehead_red, lcheek_red, rcheek_red]))
                avg_g.append(np.mean([forehead_green, lcheek_green, rcheek_green]))
                avg_b.append(np.mean([forehead_blue, lcheek_blue, rcheek_blue]))

                '''
                # Display the cropped image in a separate window
                cv2.namedWindow("Cropped_Image", cv2.WINDOW_NORMAL) 
                cv2.resizeWindow("Cropped_Image", 400,200)
                cv2.imshow('Cropped_Image', cropped_image)
                '''
        # Display the frame
        cv2.namedWindow("rPPG", cv2.WINDOW_NORMAL) 
        cv2.resizeWindow("rPPG", 900,650)
        cv2.imshow("rPPG", frame)
        frame_count += 1  # Increment the frame counter

 
    # Calculate the GRGB values of the frames
    # Implementation of the method mentioned in paper by F.Haugg et al.
    gr_signal = [a/b for a, b in zip(avg_g, avg_r)]
    gb_signal = [a/b for a, b in zip(avg_g, avg_b)]
    grgb_signal = [a + b for a , b in zip(gr_signal, gb_signal)]


    # Save all versions of rPPG
    time_limits = ['inf'] #5, 10, 20, 
    max_frames_dict = {t: (t * fps if isinstance(t, int) else total_frames) for t in time_limits}
    for length, max_frames in max_frames_dict.items():
        truncated_signal = grgb_signal[:int(max_frames)]  # Slice signal based on length
        save_rppg_signal(truncated_signal, f'{length}s' if isinstance(length, int) else 'inf', dataset, compression, amount_videos, category, filename)

cap.release()
cv2.destroyAllWindows()
face_mesh.close()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1726160194.261776 29658858 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1726160194.263831 29658853 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


: 