<a href="https://colab.research.google.com/github/GRodrigues4/rPPG-10/blob/main/rPPG_10.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Packages

In [None]:
# Install all the needed packages
!pip install mediapipe

import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2

In [2]:
# Import all the necessary libraries
import numpy as np
import cv2
from google.colab.patches import cv2_imshow
import scipy.fftpack as fftpack
import scipy.signal as signal
import scipy.interpolate as interpolate
from scipy import stats

In [None]:
# Give access to the dataset
from google.colab import drive
drive.mount('/content/drive')

# Import the Mediapipe model
!wget -O face_landmarker_v2_with_blendshapes.task -q https://storage.googleapis.com/mediapipe-models/face_landmarker/face_landmarker/float16/1/face_landmarker.task

# Dataset


In [4]:
class SignalProcessing:
  def __init__(self, file, sample_rate = 1000):

    # Constructor to initialize the SignalProcessing class with parameters:
    # file: The file name containing the data.
    # sample_rate: The rate at which data is sampled, in Hz (samples per second).

    self.file_name = file
    self.sample_rate = sample_rate


  def cut(self):

    # Method to preprocess the data by selecting a portion of the ECG signal
    # based on the condition that there is no '1' found in a specific range of 'In' values.
    # Returns the preprocessed ECG and Input (button for sync) signals.

    data = np.loadtxt(self.file_name) # Load the data from the file.
    ECG = []  # Initialize an empty list to store ECG signal values.
    In = []   # Initialize an empty list to store 'In' signal values.

    # Iterate through each row in the data.
    for row in data:
      ECG.append(row[5])  # Append the 6th column (ECG) to the ECG list.
      In.append(row[1])   # Append the 2nd column (In) to the In list.

    # Iterate through ECG data in steps of 100.
    for index in range(0,len(ECG),50):
      # Check if there is no '1' in the 'In' signal within the specified range.
      if not any(element == 1 for element in In[index:(index+60*self.sample_rate)]):
        # Select the ECG and In data for the next 10 minutes (600 seconds).
        ECG = ECG[index:(index+60*10*self.sample_rate)]
        In = In[index:(index+60*10*self.sample_rate)]
        break # Exit the loop once the desired segment is found.

    return ECG


  def run(self, output_file = "output_file.txt"):

    # Method to run the complete processing pipeline: cutting the data, calculating HR, and saving the results.

    ECG = self.cut()  # Cut and preprocess the data.
    np.save(output_file + '_ECG', ECG)  # Save the ECG signal

In [5]:
class VideoSegmentation:
  def __init__(self, file_path, fps = 30, image_width = 1280,
               image_height = 720, bounding_box_size = 64):

    # Constructor to initialize the VideoSegmentation class with parameters:
    # file_path: Path to the video file.
    # fps: Frames per second of the video.
    # image_width: Width of the video frame in pixels.
    # image_height: Height of the video frame in pixels.
    # bounding_box_size: Size of the bounding box around facial landmarks.
    # coordinates: List to store previous ROI location in case of blurry frame

    self.file_path = file_path
    self.fps = fps
    self.image_width = image_width
    self.image_height = image_height
    self.box_size = bounding_box_size
    self.coordinates = []


  def relative_to_absolute(self, relative_coords):

    # Convert relative coordinates (normalized between 0 and 1) to absolute pixel values.
    # Returns a list of absolute coordinates.

    absolute_coords = [[int(coord[0] * self.image_width), int(coord[1] * self.image_height)] for coord in relative_coords]

    return absolute_coords


  def segment_frame(self, frame, time_ms):

    # Method to detect facial landmarks in the given frame and return the coordinates of specific landmarks.
    # Returns a list of relative coordinates for the specified landmarks.

    # Create a face landmarker instance with the video mode in order to extract the facial features
    base_options = python.BaseOptions(model_asset_path = 'face_landmarker_v2_with_blendshapes.task')
    options = vision.FaceLandmarkerOptions(base_options = base_options, running_mode = mp.tasks.vision.RunningMode.VIDEO)
    detector = vision.FaceLandmarker.create_from_options(options)

    # Convert the frame to RGB as the model expects RGB input.
    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)

    # Detect facial landmarks for the frame at the given time in milliseconds.
    result = detector.detect_for_video(mp_image, round(time_ms))

    # Extract specific landmark coordinates: forehead, cheek 1, and cheek 2.
    coord = [(result.face_landmarks[0][151].x, result.face_landmarks[0][151].y),(result.face_landmarks[0][50].x, result.face_landmarks[0][50].y)
          ,(result.face_landmarks[0][280].x, result.face_landmarks[0][280].y)]
    coord = self.relative_to_absolute(coord)

    return coord


  def create_bounding_box(self, coord):

    # Method to create a bounding box around a given coordinate.
    # Returns the top-left and bottom-right coordinates of the bounding box.

    x_coord = coord[0]
    y_coord = coord[1]

    # Calculate the bounding box limits ensuring they stay within frame boundaries.
    x_min = max(0, x_coord - self.box_size // 2)
    y_min = max(0, 5+(y_coord - self.box_size // 2))
    x_max = min(self.image_width, x_coord + self.box_size // 2)
    y_max = min(self.image_height, 5+(y_coord + self.box_size // 2))

    return (int(x_min), int(y_min)), (int(x_max), int(y_max))


  def cut_video(self, frame, coord):

    # Method to extract a specific segment of the video frame using a bounding box.
    # Returns the segmented frame.

    top_left, bottom_right = self.create_bounding_box(coord)
    segmented_frame = frame[top_left[1]:bottom_right[1], top_left[0]:bottom_right[0]]

    return segmented_frame


  def run(self, output_path = "output_video"):

    # Method to execute the complete video segmentation process and save the results.

    # Open the video file using OpenCV.
    cap = cv2.VideoCapture(self.file_path)
    if not cap.isOpened():
      raise ValueError("Error opening video file")

    interval_video_F = []   # List to store segmented frames for the forehead.
    interval_video_C1 = []  # List to store segmented frames for the first cheek.
    interval_video_C2 = []  # List to store segmented frames for the second cheek.

    # Define the codec and create a VideoWriter object for each region.
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    writerF = cv2.VideoWriter(output_path + '_Forehead_.avi', fourcc, self.fps, (64, 64), 1)
    writerC1 = cv2.VideoWriter(output_path + '_Cheek1_.avi', fourcc, self.fps, (64, 64), 1)
    writerC2 = cv2.VideoWriter(output_path + '_Cheek2_.avi', fourcc, self.fps, (64, 64), 1)

    # Process each frame of the video.
    while cap.isOpened():
      ret, frame = cap.read()
      time_ms = cap.get(cv2.CAP_PROP_POS_MSEC)  # Get the current frame timestamp in milliseconds.

      # Check if the video has ended.
      if not ret:
        raise ValueError("Error reading frame")
        break

      # Resize the frame if its dimensions don't match the expected ones.
      if int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) != self.image_height:
        frame = cv2.resize(frame, (self.image_width, self.image_height))
      try:
        # Detect and get facial landmark coordinates.
        self.coordinates = self.segment_frame(frame, time_ms)
      except IndexError as e:
        continue

      writerF.write(cv2.convertScaleAbs(np.array(self.cut_video(frame, self.coordinates[0]))))
      writerC1.write(cv2.convertScaleAbs(np.array(self.cut_video(frame, self.coordinates[1]))))
      writerC2.write(cv2.convertScaleAbs(np.array(self.cut_video(frame, self.coordinates[2]))))

      # Print the progress for every 1000th frame.
      if cap.get(cv2.CAP_PROP_POS_FRAMES) % 1000 == 0:
        print(f"Frame {cap.get(cv2.CAP_PROP_POS_FRAMES)}")

      # Break the loop if the video exceeds 10 minutes.
      if cap.get(cv2.CAP_PROP_POS_FRAMES) == (self.fps*60*10):
        break

    print(output_path + '. . . . . . . Done\n')

    # Release the video capture and close any OpenCV windows.
    writerF.release()
    writerC1.release()
    writerC2.release()
    cap.release()

In [None]:
def CreateDataset(array_like):

  # Function that applies both the Video and Signal processing on the raw dataset.
  # Saves the interval videos and a txt file with the HR groundtruth.

  # Iterate through the given list.
  for subject_num in array_like:
    in_path = '/content/drive/My Drive/Data Tese/Subject_' + str(subject_num) + '/Subject_' + str(subject_num)  # Define file paths.
    out_path = '/content/drive/My Drive/Dataset_rPPG-10/Subject_' + str(subject_num) + '/Subject_' + str(subject_num)

    # Apply image and signal processing functions defined previously
    SignalProcessing(in_path + '.txt').run(output_file = out_path)
    VideoSegmentation(in_path + '.mp4').run(output_path = out_path)

# Create a list of all the subjects to be processed.
List = [4]

CreateDataset(List)