<a href="https://colab.research.google.com/github/ateffal/AlgoDesign/blob/master/src/task-3-model-development/scripts/Omdena_Sonocare_generate_frames_for_SPO2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# The purpose of this notebook is to calculate rPPG and SPO2 for all videos in African_Skintones_VBPE_dataset_filtered_videos

In [53]:
!pip install -q mediapipe opencv-python

In [54]:
import cv2
import mediapipe as mp
import numpy as np
import os
from google.colab.patches import cv2_imshow
import pandas as pd
import matplotlib.pyplot as plt
from scipy.signal import butter, lfilter, filtfilt
from PIL import Image, ImageEnhance
from skimage import io, exposure
from skimage.color import rgb2gray, gray2rgb

In [55]:
# mount drive
from google.colab import drive
# drive.mount('/content/drive')

# Helpers functions for Image Enhancement

In [56]:
def denoise_image(image):
  # Apply Non-Local Means Denoising
  denoised_image = cv2.fastNlMeansDenoisingColored(image, None, 10, 10, 7, 21)
  return denoised_image

def sharpen_image(image, kernel = np.array([[0, -1, 0], [-1, 5, -1], [0, -1, 0]])):
  # Apply the kernel to the image
  sharpened_image = cv2.filter2D(image, -1, kernel)
  return sharpened_image

def enhance_image(image, grightess_ratio=1.5, contrast_ratio=1.5):

  # # Convert the image to grayscale
  gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

  # # Apply histogram equalization
  equalized_image = cv2.equalizeHist(gray)

  # Convert back to RGB
  equalized_image = gray2rgb(equalized_image)

  return equalized_image

###############################################################################

def enhance_brightness_contrast(image, grightess_ratio=1.5, contrast_ratio=1.5):
  # Enhance brightness
  enhancer = ImageEnhance.Brightness(image)
  bright_image = enhancer.enhance(grightess_ratio)  # Increase brightness by 50% if grightess_ratio = 1.5

  # Enhance contrast
  enhancer = ImageEnhance.Contrast(bright_image)
  contrast_image = enhancer.enhance(contrast_ratio)  # # Increase constrat by 50% if contrast_ratio = 1.5

  return contrast_image


In [57]:
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a

def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

# Example usage (replace with your signal and parameters):
# pos_filtered = butter_bandpass_filter(pos_signal, 0.5, 5.0, 30)

In [58]:
def chrom_rppg(frames):
    """
    Extract rPPG signals from video frames using the CHROM method.

    """
    if len(frames) < 10:  # Ensure enough frames for filtering - for testing purpose put it to 10. Was 33
        print(f"❌ Not enough frames ({len(frames)}) for {roi}. Skipping...")
        return None

    Xs, Ys, Zs = [], [], []

    for frame in frames:
        h, w, _ = frame.shape

        # Extract Region of Interest (ROI) for Face or Hand
        roi_box = frame # frame[int(h * 0.2):int(h * 0.4), int(w * 0.3):int(w * 0.7)]  # Face region - but here we assume that frames are already faces

        b, g, r = cv2.split(roi_box)
        Xs.append(np.mean(r))
        Ys.append(np.mean(g))
        Zs.append(np.mean(b))

    Xs, Ys, Zs = np.array(Xs), np.array(Ys), np.array(Zs)

    # CHROM method: Linear combination
    rppg_signal = 3 * Xs - 2 * Ys # formula uses red and green, what about blue ?

    rppg_filtered = butter_bandpass_filter(rppg_signal, 0.5, 5.0, 30) # need to dig in the last three parameters !

    return rppg_filtered

In [59]:
def split_cap(video_name, videos_path, frames_path, time_step = 3):
  mp_face = mp.solutions.face_detection
  mp_drawing = mp.solutions.drawing_utils
  frame_count = 0
  # one image per time_step (in seconds) - to avoid memory issue get only one image per 3 second.
                # we are just experimenting

  # read the video
  cap = cv2.VideoCapture(videos_path + "/" + video_name)

  frame_rate = cap.get(cv2.CAP_PROP_FPS)
  frame_skip = frame_rate*time_step

  n = 0 # for limiting outputed images - avoid printing size exceeded issue

  with mp_face.FaceDetection(min_detection_confidence=0.5, model_selection=0) as face_detection: # model_selection = 0 --> camera distance less than 2 meters
    while cap.isOpened():
      ret, frame = cap.read()  # ret : True if the frame was read successfully. False otherwise
      if not ret:
        break
      # Skip frames
      if frame_count % frame_skip != 0:
        frame_count += 1
        continue

      frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # transform from bgr to rgb
      results = face_detection.process(frame_rgb)
      if results.detections: # normally we should detect only one face !
        for detection in results.detections:
          mp_drawing.draw_detection(frame_rgb, detection)

      # get the face only - assuming we detected one face in the frame
      bbox = results.detections[0].location_data.relative_bounding_box
      # get dimensions of img_rgb
      height, width, _ = frame_rgb.shape
      x = int(bbox.xmin * width)
      y = int(bbox.ymin * height)
      w = int(bbox.width * width)
      h = int(bbox.height * height)
      face_image = frame_rgb[y:y+h, x:x+w]
      # print("face number : " + str(n))
      # cv2_imshow(face_image)

      # Save the frame as an image
      face_image = cv2.cvtColor(face_image, cv2.COLOR_RGB2BGR)
      cv2.imwrite(frames_path + 'frame_' + str(frame_count) + '.jpg', face_image)

      frame_count += 1
      n = n+1

  cap.release()
  cv2.destroyAllWindows()

# Process all videos

In [60]:
!rm -rf /content/drive/MyDrive/Sonocare/sonocare/frames/*

In [61]:
# path for videos
videos_path = "/content/drive/MyDrive/Sonocare/sonocare/African_Skintones_VBPE_dataset_filtered_videos/"

In [62]:
# put files of videos_path folder in a list
videos_list = os.listdir(videos_path)

In [63]:
videos_list = [v for v in videos_list if v.endswith(".mp4")]
videos_list = [v for v in videos_list if v.startswith("Copy")]
videos_list

['Copy of SL463.mp4',
 'Copy of SL497.mp4',
 'Copy of SL674.mp4',
 'Copy of SL470.mp4',
 'Copy of SL433.mp4',
 'Copy of SL481.mp4',
 'Copy of SL465.mp4',
 'Copy of SL689 (1).mp4',
 'Copy of SL688 (1).mp4',
 'Copy of SL682 (1).mp4',
 'Copy of SL484.mp4',
 'Copy of SL687 (1).mp4',
 'Copy of SL679.mp4',
 'Copy of SL410.mp4',
 'Copy of SL428.mp4',
 'Copy of SL337.mp4',
 'Copy of SL348.mp4',
 'Copy of SL426.mp4',
 'Copy of SL411.mp4',
 'Copy of SL351.mp4',
 'Copy of SL377.mp4',
 'Copy of SL424.mp4',
 'Copy of SL309.mp4',
 'Copy of SL412.mp4',
 'Copy of SL310.mp4',
 'Copy of SL407.mp4',
 'Copy of SL413.mp4',
 'Copy of SL577.mp4',
 'Copy of SL630.mp4',
 'Copy of SL633.mp4',
 'Copy of SL576.mp4',
 'Copy of SL597.mp4',
 'Copy of SL585.mp4',
 'Copy of SL628.mp4',
 'Copy of SL593.mp4',
 'Copy of SL592.mp4',
 'Copy of SL580.mp4',
 'Copy of SL591.mp4',
 'Copy of SL590.mp4',
 'Copy of SL583.mp4',
 'Copy of SL656.mp4',
 'Copy of SL662.mp4',
 'Copy of SL668.mp4',
 'Copy of SL655.mp4',
 'Copy of SL661.

In [64]:
frames_path = "/content/drive/MyDrive/Sonocare/sonocare/frames/"

In [65]:
for v in videos_list:#[0:20]:
  if not os.path.exists(frames_path + v[:-4] + "/"):
    os.makedirs(frames_path + v[:-4] + "/")
    try:
      split_cap(v, videos_path, frames_path + v[:-4] + "/")
    except:
      print("error in video : " + v)
      continue
    # split_cap(v, videos_path, frames_path + v[:-4] + "/")
  else:
    print("folder already exists ! - video not proccessed !" )

error in video : Copy of SL679.mp4
error in video : Copy of SL622.mp4
error in video : Copy of SL183.mp4
error in video : Copy of SL543.mp4
error in video : Copy of SL220.mp4
