In [1]:
# --- Standard Library ---
import os
import glob

# --- Third-party Libraries ---
import cv2
import numpy as np
from tqdm import tqdm
import mediapipe as mp

# --- PyTorch ---
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader


mp_face_detection = mp.solutions.face_detection

In [None]:
def sync_bvp_with_frames(bvp_trace, bvp_time, total_frames, fps):
    """
    Resamples BVP to match video frame timestamps.
    """
    frame_times = np.arange(total_frames) / fps
    synced_bvp = np.interp(frame_times, bvp_time, bvp_trace)
    return synced_bvp

def process_video_mediapipe(video_path, bvp_path, save_dir, clip_len=150):
    os.makedirs(save_dir, exist_ok=True)
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # Load and sync BVP
    bvp_raw = np.loadtxt(bvp_path)
    if bvp_raw.ndim == 2:  # DATASET_2 format (3 x N)
        bvp_trace = bvp_raw[0, :]
        bvp_time = bvp_raw[2, :]  # seconds
    else:  # Fallback if already 1D
        bvp_trace = bvp_raw
        bvp_time = np.arange(len(bvp_trace)) / fps

    synced_bvp = sync_bvp_with_frames(bvp_trace, bvp_time, total_frames, fps)

    frames, frame_idx, clip_idx = [], 0, 0

    with mp_face_detection.FaceDetection(model_selection=0, min_detection_confidence=0.6) as face_detection:
        with tqdm(total=total_frames, desc=f"Processing {os.path.basename(save_dir)}") as pbar:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break

                rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = face_detection.process(rgb)

                if results.detections:
                    detection = results.detections[0]
                    bboxC = detection.location_data.relative_bounding_box
                    h, w, _ = frame.shape
                    x, y, bw, bh = int(bboxC.xmin * w), int(bboxC.ymin * h), int(bboxC.width * w), int(bboxC.height * h)
                    x, y = max(0, x), max(0, y)
                    face = frame[y:y+bh, x:x+bw]

                    if face.size != 0:
                        face = cv2.resize(face, (64, 64))
                        frames.append(face)

                # Save synced clips
                if len(frames) == clip_len:
                    clip_bvp = synced_bvp[frame_idx - clip_len + 1:frame_idx + 1]
                    if len(clip_bvp) == clip_len:
                        np.save(os.path.join(save_dir, f"clip_{clip_idx}_frames.npy"), np.array(frames))
                        np.save(os.path.join(save_dir, f"clip_{clip_idx}_bvp.npy"), clip_bvp)
                        clip_idx += 1
                    frames = []

                frame_idx += 1
                pbar.update(1)

    cap.release()
    print(f"[INFO] Processed {clip_idx} clips from {video_path}")

def process_all_subjects(ubfc_dir, save_root="processed_data"):
    os.makedirs(save_root, exist_ok=True)
    subjects = []
    for s in os.listdir(ubfc_dir):
        digits = ''.join(filter(str.isdigit, s))
        if digits.isdigit():
            subjects.append((s, int(digits)))
    subjects = [s[0] for s in sorted(subjects, key=lambda x: x[1])]

    for subj in subjects:
        subj_path = os.path.join(ubfc_dir, subj)
        if not os.path.isdir(subj_path):
            continue

        video_path = os.path.join(subj_path, "vid.avi")
        bvp_path = os.path.join(subj_path, "ground_truth.txt")

        if os.path.exists(video_path) and os.path.exists(bvp_path):
            save_dir = os.path.join(save_root, subj)
            process_video_mediapipe(video_path, bvp_path, save_dir)
        else:
            print(f"[WARNING] Missing files for {subj}")

process_all_subjects("UBFC-RPPG") #UBFC-RPPG dataset Folder

In [None]:
#Dataset Loader
class RPPGDataset(Dataset):
    def __init__(self, root):
        self.samples = []
        for subj in os.listdir(root):
            subj_path = os.path.join(root, subj)
            if os.path.isdir(subj_path):
                frame_files = [f for f in os.listdir(subj_path) if "frames" in f]
                for f in frame_files:
                    frame_path = os.path.join(subj_path, f)
                    bvp_path = frame_path.replace("frames", "bvp")
                    if os.path.exists(bvp_path):
                        self.samples.append((frame_path, bvp_path))

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        frame_path, bvp_path = self.samples[idx]
        frames = np.load(frame_path)          # (T, H, W, C) = (150, 64, 64, 3)
        bvp = np.load(bvp_path)               # (T,)

        frames = frames.transpose(3, 0, 1, 2) # ✅ Now (C, T, H, W)
        frames = torch.tensor(frames, dtype=torch.float32) / 255.0
        bvp = torch.tensor(bvp, dtype=torch.float32)

        return frames, bvp

# Example usage:
dataset = RPPGDataset("processed_data")
loader = DataLoader(dataset, batch_size=4, shuffle=True)
print(f"Total Clips: {len(dataset)}")


In [None]:
#3D CNN PhysNET Definition
class ConvBlock3D(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=(3,3,3), stride=(1,1,1), padding=(1,1,1)):
        super(ConvBlock3D, self).__init__()
        self.conv = nn.Conv3d(in_channels, out_channels, kernel_size, stride, padding)
        self.bn = nn.BatchNorm3d(out_channels)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        return self.relu(self.bn(self.conv(x)))


class ResidualBlock3D(nn.Module):
    def __init__(self, channels):
        super(ResidualBlock3D, self).__init__()
        self.conv1 = ConvBlock3D(channels, channels)
        self.conv2 = ConvBlock3D(channels, channels)

    def forward(self, x):
        identity = x
        out = self.conv1(x)
        out = self.conv2(out)
        return out + identity  # Skip connection


class PhysNet3D(nn.Module):
    def __init__(self):
        super(PhysNet3D, self).__init__()
        # Encoder
        self.block1 = ConvBlock3D(3, 32, kernel_size=(3,5,5), stride=(1,1,1), padding=(1,2,2))
        self.block2 = ConvBlock3D(32, 64, kernel_size=(3,5,5), stride=(1,2,2), padding=(1,2,2))
        self.res2 = ResidualBlock3D(64)
        self.block3 = ConvBlock3D(64, 128, kernel_size=(3,3,3), stride=(1,2,2), padding=(1,1,1))
        self.res3 = ResidualBlock3D(128)
        self.block4 = ConvBlock3D(128, 256, kernel_size=(3,3,3), stride=(1,2,2), padding=(1,1,1))
        self.res4 = ResidualBlock3D(256)

        # Decoder (Upsample to recover temporal resolution)
        self.deconv3 = nn.ConvTranspose3d(256, 128, kernel_size=(1,4,4), stride=(1,2,2), padding=(0,1,1), output_padding=(0,0,0))
        self.deconv2 = nn.ConvTranspose3d(128, 64, kernel_size=(1,4,4), stride=(1,2,2), padding=(0,1,1))
        self.deconv1 = nn.ConvTranspose3d(64, 32, kernel_size=(1,4,4), stride=(1,2,2), padding=(0,1,1))

        # Output: Predict 1-channel rPPG signal
        self.out_conv = nn.Conv3d(32, 1, kernel_size=(1,1,1))

    def forward(self, x):
        # Encoder
        x = self.block1(x)
        x = self.res2(self.block2(x))
        x = self.res3(self.block3(x))
        x = self.res4(self.block4(x))

        # Decoder
        x = F.relu(self.deconv3(x))
        x = F.relu(self.deconv2(x))
        x = F.relu(self.deconv1(x))

        x = self.out_conv(x)  # (B,1,T,H,W)
        return x.mean(dim=[3,4])  # Average spatially → (B,1,T)


In [None]:
#LOSS FUNCTION
def neg_pearson_loss(pred, target):
    pred = pred.squeeze(1)
    target = target.squeeze(1)

    vx = pred - torch.mean(pred, dim=1, keepdim=True)
    vy = target - torch.mean(target, dim=1, keepdim=True)

    cost = torch.sum(vx * vy, dim=1) / (
        torch.sqrt(torch.sum(vx**2, dim=1) * torch.sum(vy**2, dim=1)) + 1e-8
    )
    return 1 - cost.mean()  # 1 - Pearson correlation

In [None]:
#Training LOOP!
device = "cuda" if torch.cuda.is_available() else "cpu"
model = PhysNet3D().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
criterion = neg_pearson_loss  # Use Negative Pearson Loss

EPOCHS = 15

for epoch in range(EPOCHS):
    running_loss = 0.0

    # Wrap DataLoader with tqdm
    pbar = tqdm(loader, desc=f"Epoch [{epoch+1}/{EPOCHS}]", unit="batch")
    
    for frames, bvp in pbar:
        frames, bvp = frames.to(device), bvp.to(device)

        # Forward pass
        pred = model(frames)  # (B,1,T)
        loss = criterion(pred, bvp.unsqueeze(1))

        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # Update TQDM bar with live loss
        pbar.set_postfix({"loss": f"{loss.item():.4f}"})

    print(f"Epoch [{epoch+1}/{EPOCHS}] Avg Loss: {running_loss/len(loader):.4f}")
