# rPPG : Remote photoplethysmography



Advancements in medical technologies have led to the development of contact-free methods of haemodynamic monitoring such as remote photoplethysmography (rPPG). rPPG uses video cameras to interpret variations in skin colour related to blood flow, which are analysed to generate vital signs readings.

## Liveness Detection with Multiple Techniques

### Imports and Initial Setup

In [1]:
!pip3 install tensorflow keras numpy mediapipe scipy opencv-python dlib matplotlib

Defaulting to user installation because normal site-packages is not writeable


In [2]:
# Import necessary libraries
import cv2
import numpy as np
from scipy.signal import butter, filtfilt, find_peaks
import dlib  # For landmark detection
import matplotlib.pyplot as plt


### Optical Flow for Motion Detection

This function computes the optical flow between consecutive frames, a crucial step in detecting motion in video streams.

In [3]:
# Compute optical flow between consecutive frames
def compute_optical_flow(prev_gray, frame_gray):
    flow = cv2.calcOpticalFlowFarneback(prev_gray, frame_gray, None, 0.5, 3, 15, 3, 5, 1.2, 0)
    return flow

### Eye Blink Detection with dlib's Landmarks

This section identifies eye blinks using facial landmarks and computes the eye aspect ratio (EAR) to detect if the eyes are closed.

In [4]:
# Compute eye aspect ratio for blink detection
def eye_aspect_ratio(eye):
    eye = np.array([(point.x, point.y) for point in eye])
    A = np.linalg.norm(eye[1] - eye[5])
    B = np.linalg.norm(eye[2] - eye[4])
    C = np.linalg.norm(eye[0] - eye[3])
    ear = (A + B) / (2.0 * C)
    return ear

# Detect eye blinks based on landmarks
def detect_blinks(landmarks, eye_thresh=0.3):
    left_eye = [landmarks.part(i) for i in range(36, 42)]
    right_eye = [landmarks.part(i) for i in range(42, 48)]
    left_eye_ratio = eye_aspect_ratio(left_eye)
    right_eye_ratio = eye_aspect_ratio(right_eye)
    return left_eye_ratio < eye_thresh or right_eye_ratio < eye_thresh


### Head Pose Estimation

Uses dlib facial landmarks to estimate head pose based on a 3D model of the human face.

In [5]:
def estimate_head_pose(landmarks):
    # 2D image points (for head pose estimation)
    image_points = np.array([
        (landmarks[30].x, landmarks[30].y),  # Nose tip
        (landmarks[8].x, landmarks[8].y),    # Chin
        (landmarks[36].x, landmarks[36].y),  # Left eye left corner
        (landmarks[45].x, landmarks[45].y),  # Right eye right corner
        (landmarks[48].x, landmarks[48].y),  # Left mouth corner
        (landmarks[54].x, landmarks[54].y)   # Right mouth corner
    ], dtype="double")

    # 3D model points for head pose estimation
    model_points = np.array([
        (0.0, 0.0, 0.0),       # Nose tip
        (0.0, -330.0, -65.0),  # Chin
        (-225.0, 170.0, -135.0),  # Left eye left corner
        (225.0, 170.0, -135.0),   # Right eye right corner
        (-150.0, -150.0, -125.0), # Left mouth corner
        (150.0, -150.0, -125.0)   # Right mouth corner
    ])

    # Camera internals
    size = (640, 480)
    focal_length = size[0]
    center = (size[0] / 2, size[1] / 2)
    camera_matrix = np.array([
        [focal_length, 0, center[0]],
        [0, focal_length, center[1]],
        [0, 0, 1]
    ])

    dist_coeffs = np.zeros((4, 1))  # Assuming no lens distortion

    _, rotation_vector, translation_vector = cv2.solvePnP(model_points, image_points, camera_matrix, dist_coeffs)

    return rotation_vector, translation_vector


### Bandpass Filtering for rPPG Signal

Applies a bandpass filter to the remote photoplethysmography (rPPG) signal for heart rate estimation.

In [6]:
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyquist = 0.5 * fs
    low = lowcut / nyquist
    high = highcut / nyquist
    b, a = butter(order, [low, high], btype='band')
    return b, a

def bandpass_filter(data, lowcut=0.7, highcut=4.0, fs=30, order=5):
    if len(data) < 2 * order:
        return data
    b, a = butter_bandpass(lowcut, highcut, fs, order)
    return filtfilt(b, a, data)


### Heart Rate Estimation from rPPG Signal

Extracts the rPPG signal, applies a filter, and estimates the heart rate using peak detection.

In [7]:
def extract_roi(frame, face_cascade):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(100, 100))
    return faces

def extract_rppg_signal(roi):
    roi = cv2.resize(roi, (100, 100))  # Resize to standard dimensions
    green_channel = roi[:, :, 1]  # Extract the green channel
    rppg_signal = np.mean(green_channel, axis=0)  # Compute mean across rows
    return rppg_signal

def estimate_heart_rate(rppg_signal, fps, window_size=60, lowcut=0.7, highcut=4.0):
    if len(rppg_signal) < window_size:
        return None, None

    windowed_signal = rppg_signal[-window_size:]
    filtered_signal = bandpass_filter(windowed_signal, lowcut=lowcut, highcut=highcut, fs=fps)

    peaks, _ = find_peaks(filtered_signal, distance=15)
    if len(peaks) > 1:
        heart_rate = len(peaks) / (len(filtered_signal) / fps) * 60
        return heart_rate, filtered_signal
    return None, None

def compute_variability(filtered_signal):
    if filtered_signal is None or len(filtered_signal) < 2:
        return 0
    # Variability is the standard deviation of the heart rate signal
    return np.std(filtered_signal)



### Motion Thresholding

This detects motion by comparing consecutive frames

In [8]:
def motion_threshold(roi, previous_roi, motion_threshold=0.01):
    """Calculate motion based on previous ROI and current frame"""
    if previous_roi is None:
        return False

    # Resize both ROIs to the same size
    roi_resized = cv2.resize(roi, (previous_roi.shape[1], previous_roi.shape[0]))

    # Convert both ROIs to grayscale
    roi_gray = cv2.cvtColor(roi_resized, cv2.COLOR_BGR2GRAY)
    previous_roi_gray = cv2.cvtColor(previous_roi, cv2.COLOR_BGR2GRAY)

    # Compute absolute difference between the current and previous frame
    diff = cv2.absdiff(previous_roi_gray, roi_gray)

    # Calculate motion score by summing pixel differences and normalizing by ROI size
    motion_score = np.sum(diff) / float(roi_resized.size)

    return motion_score > motion_threshold


### Detecting Skin texture and subject focus

In [9]:
def analyze_texture(roi):
    gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
    laplacian = cv2.Laplacian(gray, cv2.CV_64F)
    texture_score = laplacian.var()  # Variance of Laplacian
    return texture_score > 30, texture_score  # Lower texture variance threshold for real faces

def analyze_focus(roi):
    gray = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY)
    laplacian = cv2.Laplacian(gray, cv2.CV_64F)
    focus_score = laplacian.var()  # Variance of Laplacian
    return focus_score > 10, focus_score  # Looser focus threshold to allow more real faces


### Liveness Evaluation based on all the variables

By assigning weights to each aspect discussed before we can generate a liveness score and process frames

In [10]:
def evaluate_liveness(heart_rate, variability, motion_detected, texture_score, focus_detected, blink_detected, confidence_threshold=0.6):
    """Evaluate liveness based on heart rate, motion, texture, focus, and blink detection"""
    motion_weight = 0.15
    texture_weight = 0.15
    heart_rate_weight = 0.25
    focus_weight = 0.2
    blink_weight = 0.25

    alive_count = 0
    if motion_detected:
        alive_count += 1
    if texture_score:
        alive_count += 1
    if heart_rate and 50 <= heart_rate <= 100:
        alive_count += 1
    if focus_detected:
        alive_count += 1
    if blink_detected:
        alive_count += 1

    score = (motion_weight * motion_detected) + (texture_weight * texture_score) + (heart_rate_weight * (heart_rate is not None and 50 <= heart_rate <= 100)) + (focus_weight * focus_detected) + (blink_weight * blink_detected)

    if (alive_count >= 3 or score >= 40) and variability > 20 :  # Deepfake variability threshold
        return "Alive", True
    return "Deep Fake", False

### Real-Time Liveness Detection
Captures video from the webcam and applies all the above techniques to detect liveness in real-time.

In [None]:
sum_v = 0
sum_f = 0
sum_t = 0
frame_count = 0

try:
    face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
    cap = cv2.VideoCapture(0)
    fps = 30
    detector = dlib.get_frontal_face_detector()
    predictor = dlib.shape_predictor('shape_predictor_68_face_landmarks.dat')

    if not cap.isOpened():
        raise Exception("Error: Could not open webcam.")

    previous_roi = None
    previous_gray = None
    previous_landmarks = None

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(100, 100))

        for (x, y, w, h) in faces:
            roi = frame[y:y+h, x:x+w]
            if previous_roi is not None:
                motion_detected = motion_threshold(roi, previous_roi)
            else:
                motion_detected = False

            # Use dlib to detect landmarks and blink detection
            landmarks = predictor(gray, dlib.rectangle(x, y, x+w, y+h))
            blink_detected = detect_blinks(landmarks)
            
            # Estimate heart rate from RPPG signal
            rppg_signal = extract_rppg_signal(roi)
            heart_rate, filtered_signal = estimate_heart_rate(rppg_signal, fps)
            variability = compute_variability(rppg_signal)
            sum_v += variability

            frame_count+=1

            # Analyze texture and focus
            texture_score, texture = analyze_texture(roi)
            sum_t+=texture

            focus_detected, focus = analyze_focus(roi)
            sum_f+=focus

            # Evaluate liveness
            status, is_alive = evaluate_liveness(heart_rate, variability, motion_detected, texture_score, focus_detected, blink_detected)
            print(f"Status: {status}, Heart Rate: {heart_rate}, Variability: {variability}")

            previous_roi = roi
            previous_gray = gray
            previous_landmarks = landmarks

            # Draw rectangle around the face
            color = (0, 255, 0) if is_alive else (0, 0, 255)  # Green for alive, red for deep fake
            cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)

            # Display heart rate and liveness status on the frame
            # cv2.putText(frame, f"Heart Rate: {heart_rate if heart_rate else 'N/A'} bpm", (x, y - 30),
            #             cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
            cv2.putText(frame, f"Liveness: {status}", (x, y - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)

            # Display the resulting frame
            cv2.imshow('Frame', frame)

        if cv2.waitKey(1) & 0xFF == ord('q'):
            print (f"average variability: {sum_v/frame_count}")
            print (f"average focus: {sum_f/frame_count}")
            print (f"average texture: {sum_t/frame_count}")

            break

except:
    print("Exception")
finally:
    cap.release()
    cv2.destroyAllWindows()

: 