In [None]:
!pip install mediapipe

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import cv2
import mediapipe as mp
import numpy as np



In [8]:
video_path = "/content/drive/MyDrive/rPPG/input_1.mp4"     # your input video
save_path = "/content/drive/MyDrive/rPPG/rppg_raw_landmarks.npz"
max_frames = None                  # e.g. 1000 to limit, or None for all
show_debug = True                  # show frame with circles
min_face_eye_dist = 60             # skip very small faces (in pixels)
roi_radius_factor = 0.1           # ROI radius = this * inter-eye distance

In [10]:
def lm_to_px(lm, w, h):
    """Convert one normalized landmark to integer pixel coords."""
    x = int(lm.x * w)
    y = int(lm.y * h)
    return x, y


def get_circular_roi_mean_rgb(frame_rgb, cx, cy, radius):
    """Return mean [R,G,B] in a circular ROI centered at (cx,cy)."""
    h, w, _ = frame_rgb.shape
    # bounding box of the circle
    x1 = max(cx - radius, 0)
    x2 = min(cx + radius, w - 1)
    y1 = max(cy - radius, 0)
    y2 = min(cy + radius, h - 1)

    if x2 <= x1 or y2 <= y1:
        return None

    # build mask for the circle
    yy, xx = np.ogrid[y1:y2, x1:x2]
    dist_sq = (xx - cx) ** 2 + (yy - cy) ** 2
    mask = dist_sq <= radius ** 2

    roi_pixels = frame_rgb[y1:y2, x1:x2, :][mask]
    if roi_pixels.size == 0:
        return None

    return roi_pixels.mean(axis=0)  # [R,G,B]

def combine_rois_rgb(forehead_list, left_cheek_list, right_cheek_list,
                     normalize_per_roi=True):
    """
    Combine multiple ROI RGB signals from forehead, left cheek,
    and right cheek into a single face RGB signal.

    Parameters
    ----------
    forehead_list : list of arrays, each (N,3)
    left_cheek_list : list of arrays, each (N,3)
    right_cheek_list : list of arrays, each (N,3)
    normalize_per_roi : bool
        Whether to normalize each ROI individually.

    Returns
    -------
    rgb_mean : array (N,3)
        The combined face RGB signal.
    """
    # Merge all ROI lists into one flat list
    all_rois = forehead_list + left_cheek_list + right_cheek_list

    rgb_all = []

    for roi in all_rois:
        rgb = roi.astype(np.float64)

        if normalize_per_roi:
            mu = rgb.mean(axis=0)
            sigma = rgb.std(axis=0) + 1e-8
            rgb = (rgb - mu) / sigma

        rgb_all.append(rgb)

    # Stack into (num_rois, N, 3)
    rgb_all = np.stack(rgb_all, axis=0)

    # Average across ROIs
    rgb_mean = rgb_all.mean(axis=0)  # (N,3)

    return rgb_mean

In [6]:
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
    raise RuntimeError(f"Cannot open video: {video_path}")

fps = cap.get(cv2.CAP_PROP_FPS)
print("Video FPS:", fps)

Video FPS: 30.00285032591299


In [7]:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(
    static_image_mode=False,
    max_num_faces=1,
    refine_landmarks=False,
    min_detection_confidence=0.5,
    min_tracking_confidence=0.5,
)

In [None]:
# landmark indices we use
FOREHEAD_IDX = [10,109,108,107,9,336,337,338]
LEFT_CHEEK_IDX = [118, 119, 100, 126, 209, 49, 129, 203, 205, 50]
RIGHT_CHEEK_IDX = [347, 348, 329, 355, 429, 279, 358, 423, 425, 280]
LEFT_EYE_OUTER = 33
RIGHT_EYE_OUTER = 263

timestamps = []
forehead_rgb = []
left_cheek_rgb = []
right_cheek_rgb = []
combined_rgb = []

frame_idx = 0

while True:
    ret, frame_bgr = cap.read()
    if not ret:
        break
    if max_frames is not None and frame_idx >= max_frames:
        break

    frame_idx += 1
    h, w, _ = frame_bgr.shape

    # BGR -> RGB for MediaPipe
    frame_rgb = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    results = face_mesh.process(frame_rgb)

    if not results.multi_face_landmarks:
        continue

    face_lms = results.multi_face_landmarks[0].landmark

    # --- compute scale: inter-eye distance ---
    lx, ly = lm_to_px(face_lms[LEFT_EYE_OUTER], w, h)
    rx, ry = lm_to_px(face_lms[RIGHT_EYE_OUTER], w, h)
    eye_dist = np.hypot(rx - lx, ry - ly)

    if eye_dist < min_face_eye_dist:
        # face too small for reliable rPPG
        continue

    radius = int(roi_radius_factor * eye_dist)
    if radius < 3:
        continue

    for idxf in FOREHEAD_IDX:
        # --- ROI centers from landmarks ---
      fh_cx, fh_cy = lm_to_px(face_lms[idxf], w, h)

      # --- compute mean RGB in circular ROIs ---
      fh_mean = get_circular_roi_mean_rgb(frame_rgb, fh_cx, fh_cy, radius)

      if fh_mean is None:
          continue

      forehead_rgb.append(fh_mean)

    for idxl in LEFT_CHEEK_IDX:
        # --- ROI centers from landmarks ---
      lc_cx, lc_cy = lm_to_px(face_lms[idxl], w, h)

      # --- compute mean RGB in circular ROIs ---
      lc_mean = get_circular_roi_mean_rgb(frame_rgb, lc_cx, lc_cy, radius)

      if lc_mean is None:
          continue

      left_cheek_rgb.append(lc_mean)

    for idxr in LEFT_CHEEK_IDX:
        # --- ROI centers from landmarks ---
      rc_cx, rc_cy = lm_to_px(face_lms[idxr], w, h)

      # --- compute mean RGB in circular ROIs ---
      rc_mean = get_circular_roi_mean_rgb(frame_rgb, rc_cx, rc_cy, radius)

      if rc_mean is None:
          continue

      right_cheek_rgb.append(rc_mean)

    combined_rgb = combine_rois_rgb(forehead_rgb, left_cheek_rgb, right_cheek_rgb)

    t = frame_idx / fps if fps > 0 else frame_idx
    timestamps.append(t)

cap.release()
cv2.destroyAllWindows()
face_mesh.close()

# --- save data ---
timestamps = np.array(timestamps)
forehead_rgb = np.vstack(forehead_rgb)
left_cheek_rgb = np.vstack(left_cheek_rgb)
right_cheek_rgb = np.vstack(right_cheek_rgb)

print("Collected frames:", len(timestamps))
print("Saving to:", save_path)

np.savez(
    save_path,
    timestamps=timestamps,
    fps=fps,
    forehead=forehead_rgb,
    left_cheek=left_cheek_rgb,
    right_cheek=right_cheek_rgb,
)

In [None]:
cap = cv2.VideoCapture("input.mp4")

while True:
    ret, frame = cap.read()
    if not ret:
        break

    green = frame[:, :, 1]

    # Recreate an image that only has green values
    green_img = np.zeros_like(frame)
    green_img[:, :, 1] = green  # Only G channel active

    cv2.imshow("Green", green_img)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()