In [1]:
import torch
print("GPU Available:", torch.cuda.is_available())


GPU Available: False


In [2]:
from google.colab import drive
drive.mount('/content/drive')


Mounted at /content/drive


In [3]:
!pip install opencv-python mediapipe moviepy


Collecting mediapipe
  Downloading mediapipe-0.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.5-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (35.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.9/35.9 MB[0m [31m22.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading protobuf-4.25.5-cp37-abi3-manylinux2014_x86_64.whl (294 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.6/294.6 kB[0m [31m21.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Installing collected packages: protobuf, sounddevice, mediapipe
  Attempting uninstall: protobuf
    Found existing installation: protobuf 3.20.

In [4]:
from pathlib import Path
import shutil
from datetime import datetime
import math

from scipy.signal import savgol_filter

import cv2
import mediapipe as mp
import numpy as np

OUTPUT_VIDEO_WIDTH = 1920
OUTPUT_VIDEO_HEIGHT = 1080
LEFT_EYE_SHIFT = 480
RIGHT_EYE_SHIFT = 480
OUTPUT_PATH_TMP = 'output_video_tmp.mp4'
EYE_OPEN_THRESHOLD = 0.2
RECALC_AREA_FRAMES = 10
TARGET_WIDTH = 1280
TARGET_HEIGHT = 720


In [5]:
def get_pupils(landmarks):
  left_pupil = (
      (landmarks.landmark[33].x + landmarks.landmark[36].x) / 2,
      (landmarks.landmark[33].y + landmarks.landmark[36].y) / 2
    )

  right_pupil = (
      (landmarks.landmark[133].x + landmarks.landmark[136].x) / 2,
      (landmarks.landmark[133].y + landmarks.landmark[136].y) / 2
    )
  return left_pupil, right_pupil


In [6]:
def get_eye_aspect_ratio(landmarks):
    # Calculate distances for EAR
    left_eye_height = (
        landmarks.landmark[159].y - landmarks.landmark[145].y +  # Vertical distance
        landmarks.landmark[158].y - landmarks.landmark[144].y
    )
    right_eye_height = (
        landmarks.landmark[386].y - landmarks.landmark[374].y +  # Vertical distance
        landmarks.landmark[387].y - landmarks.landmark[373].y
    )
    left_eye_width = landmarks.landmark[36].x - landmarks.landmark[39].x  # Horizontal distance
    right_eye_width = landmarks.landmark[123].x - landmarks.landmark[130].x  # Horizontal distance

    # Calculate EAR for both eyes
    left_eye_ratio = left_eye_height / (2.0 * left_eye_width)
    right_eye_ratio = right_eye_height / (2.0 * right_eye_width)

    return left_eye_ratio, right_eye_ratio

In [7]:
def get_eye_center(landmarks):
  left_eye_landmarks = [(landmarks.landmark[i].x, landmarks.landmark[i].y) for i in range(33, 39)]
  right_eye_landmarks = [(landmarks.landmark[i].x, landmarks.landmark[i].y) for i in range(133, 139)]

  left_eye_center = (
      sum(x for x, _ in left_eye_landmarks) / len(left_eye_landmarks),
      sum(y for _, y in left_eye_landmarks) / len(left_eye_landmarks)
      )
  right_eye_center = (
      sum(x for x, _ in right_eye_landmarks) / len(right_eye_landmarks),
      sum(y for _, y in right_eye_landmarks) / len(right_eye_landmarks)
      )
  return left_eye_center, right_eye_center

In [8]:
def exponential_smoothing(angles, angle, alpha=0.8):
    if angles:
        smoothed_angle = alpha * angles[-1] + (1 - alpha) * angle
    else:
        smoothed_angle = angle  # If no previous angles, just return the current angle
    return smoothed_angle


In [9]:
def exponential_smoothing_midpoint(midpoints, new_midpoint, alpha=0.8):
    if midpoints:
        smoothed_midpoint = (
            alpha * midpoints[-1][0] + (1 - alpha) * new_midpoint[0],
            alpha * midpoints[-1][1] + (1 - alpha) * new_midpoint[1]
        )
    else:
        smoothed_midpoint = new_midpoint  # If no previous midpoints, just return the current midpoint
    return smoothed_midpoint


In [10]:
def get_rotation_matrix(landmarks, angles, midpoints):

  # Get eye aspect ratios
  left_eye_ratio, right_eye_ratio = get_eye_aspect_ratio(landmarks)

  if left_eye_ratio < EYE_OPEN_THRESHOLD and right_eye_ratio < EYE_OPEN_THRESHOLD:
    print("Eyes are closed.")
    if angles and midpoints:
      print('Previous angle was used')
      angle = angles[-1]
      midpoint = midpoints[-1]

  else:
    left_pupil, right_pupil = get_pupils(landmarks)
    dx = right_pupil[0] - left_pupil[0]
    dy = right_pupil[1] - left_pupil[1]
    midpoint = (
      (left_pupil[0] + right_pupil[0]) / 2,
      (left_pupil[1] + right_pupil[1]) / 2
    )

    angle = math.degrees(math.atan2(dy, dx))
    smoothed_angle = exponential_smoothing(angles, angle)
    angles.append(smoothed_angle)
    smoothed_midpoint = exponential_smoothing_midpoint(midpoints, midpoint)
    midpoints.append(smoothed_midpoint)

  rotation_matrix = cv2.getRotationMatrix2D(smoothed_midpoint, smoothed_angle, 1.0)
  return rotation_matrix, angles, midpoints



In [11]:
def process_frame(frame, landmarks, angles, midpoints):

  height, width = frame.shape[:2]

  # Create rotation matrix
  # rotation_matrix = cv2.getRotationMatrix2D(center, angle, 1.0)
  rotation_matrix, angles, midpoints = get_rotation_matrix(landmarks, angles, midpoints)
  # Rotate the frame
  rotated_frame = cv2.warpAffine(frame, rotation_matrix, (width, height))
  # crop the fram
  # fill to 1920x1080

  return rotated_frame, angles, midpoints

In [12]:
def update_crop_params(landmarks, params):
    frame_height = params['frame_height']

    left_pupil, right_pupil = get_pupils(landmarks)
    print(f'Left pupil: {left_pupil}, Right pupil: {right_pupil}')

    dx = right_pupil[0] - left_pupil[0]
    dy = right_pupil[1] - left_pupil[1]
    midpoint = (
      (left_pupil[0] + right_pupil[0]) / 2,
      (left_pupil[1] + right_pupil[1]) / 2)

    w = int(2 * dx)
    h = int(9 / 16 * w)
    x = int(midpoint[0] - dx)
    y = int(min(frame_height - h, midpoint[1] - 0.5 * h))
    print(f'x: {x}, y: {y}, w: {w}, h: {h}')

    # params['crop_wide'].append(w)
    # params['crop_height'].append(h)
    # params['crop_x'].append(x)
    # params['crop_y'].append(y)
  # new_params = {}
    params['crop_wide'].append(200)
    params['crop_height'].append(200)
    params['crop_x'].append(200)
    params['crop_y'].append(200)
  # new_params = {}
  # params.append(new_params)

In [13]:
def crop_frame(frame, params):
  # if not (params['crop_wide'] and params['crop_height'] and params['crop_x'] and params['crop_y']):
    # return fram
  x = params['crop_x'][-1]
  y = params['crop_y'][-1]
  w = params['crop_wide'][-1]
  h = params['crop_height'][-1]
  # if not (params['crop_wide'] and params['crop_height'] and params['crop_x'] and params['crop_y']):
    # return frame
  frame = frame[y:y + h, x:x + w]
  return frame

In [17]:
def process_video(in_video, out_video, face_mesh, params):
    # n_face, n_no_face = 0, 0
    # angles = []
    # midpoints = []
    frames_counter = 0

    while True:
      ret, frame = in_video.read()
      if not ret:
          break
      rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

      if not frames_counter:
        face_mask_results = face_mesh.process(rgb_frame)
        if face_mask_results.multi_face_landmarks:
          frames_counter = RECALC_AREA_FRAMES
          landmarks = face_mask_results.multi_face_landmarks[0]
          update_crop_params(landmarks, params)
          # TODO: apply angle transformation and recalc landmarks
          # get the crop/matrix/parameters here
          # use those parameters or smth to change frame
          # new_frame, angles, midpoints = process_frame(frame, landmarks, angles, midpoints)
          # out.write(new_frame)
        else:
          print('Face not found')
      else:
        frames_counter -= 1

      cropped_frame = crop_frame(frame, params)
      resized_frame = cv2.resize(cropped_frame, (OUTPUT_VIDEO_WIDTH, OUTPUT_VIDEO_HEIGHT))
      out_video.write(resized_frame)

    in_video.release()
    out_video.release()

    return out_video


In [18]:
def get_params():
  params = {}
  params['crop_wide'] = []
  params['crop_height'] = []
  params['crop_x'] = []
  params['crop_y'] = []
  return params

In [22]:
def get_video_captures(video_path):
    params = get_params()

    assert Path(video_path).exists(), f"Video file not found: {video_path}"
    cap = cv2.VideoCapture(video_path)
    # FRAME_WIDTH = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    FRAME_HEIGHT = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    params['frame_height'] = FRAME_HEIGHT

    fps = cap.get(cv2.CAP_PROP_FPS)
    # print(f"FPS: {fps}")
    out_video = get_out_video_writer(fps)

    return cap, out_video, params

In [23]:
def save_output(out_video, video_path):

    output_path = Path(video_path).parent / (Path(video_path).stem + '.mp4')
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_path = Path(video_path).parent / f"{Path(video_path).stem}_{timestamp}_processed.mp4"

    shutil.copy(OUTPUT_PATH_TMP, output_path)
    # print(f'Saved as {output_path}')


In [26]:
#
def crop_videofile(video_path: str, face_mesh):

    in_video, out_video, params = get_video_captures(video_path)

    out_video = process_video(in_video, out_video, face_mesh, params)

    save_output(out_video, video_path)


In [27]:
def get_out_video_writer(fps):
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(OUTPUT_PATH_TMP,
                          fourcc,
                          fps,
                          (OUTPUT_VIDEO_WIDTH, OUTPUT_VIDEO_HEIGHT))
    return out

In [28]:
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1)
video_path = '/content/drive/MyDrive/Personal/BUDKA/cropped_short.mp4'
crop_videofile(video_path, face_mesh)



Left pupil: (0.5457773208618164, 0.2177230343222618), Right pupil: (0.5715020596981049, 0.3057642877101898)
x: 0, y: 0, w: 0, h: 0
Left pupil: (0.5451674461364746, 0.22365278750658035), Right pupil: (0.5744246244430542, 0.3093353882431984)
x: 0, y: 0, w: 0, h: 0
Left pupil: (0.5539412796497345, 0.21649093180894852), Right pupil: (0.5806784629821777, 0.3046976625919342)
x: 0, y: 0, w: 0, h: 0


In [None]:
for i in dir(face_mesh):
  print(f"{i} - {getattr(face_mesh, i)}")

__class__ - <class 'mediapipe.python.solutions.face_mesh.FaceMesh'>
__delattr__ - <method-wrapper '__delattr__' of FaceMesh object at 0x7d91940708e0>
__dict__ - {'_input_stream_type_info': {'image': <PacketDataType.IMAGE: 'image'>}, '_output_stream_type_info': {'multi_face_landmarks': <PacketDataType.PROTO_LIST: 'proto_list'>}, '_side_input_type_info': {'num_faces': <PacketDataType.INT: 'int'>, 'with_attention': <PacketDataType.BOOL: 'bool'>, 'use_prev_landmarks': <PacketDataType.BOOL: 'bool'>}, '_graph': <mediapipe.python._framework_bindings.calculator_graph.CalculatorGraph object at 0x7d91944e2a30>, '_simulated_timestamp': 99999, '_graph_outputs': {'multi_face_landmarks': <mediapipe.Packet with timestamp: 99999 and C++ type: ::std::vector<::mediapipe::NormalizedLandmarkList>>}, '_input_side_packets': {'num_faces': <mediapipe.Packet with timestamp: UNSET and C++ type: int>, 'with_attention': <mediapipe.Packet with timestamp: UNSET and C++ type: bool>, 'use_prev_landmarks': <mediapipe.

In [None]:
help(face_mesh.process)

Help on method process in module mediapipe.python.solutions.face_mesh:

process(image: numpy.ndarray) -> <function NamedTuple at 0x7d92d7d7ae60> method of mediapipe.python.solutions.face_mesh.FaceMesh instance
    Processes an RGB image and returns the face landmarks on each detected face.
    
    Args:
      image: An RGB image represented as a numpy ndarray.
    
    Raises:
      RuntimeError: If the underlying graph throws any error.
      ValueError: If the input image is not three channel RGB.
    
    Returns:
      A NamedTuple object with a "multi_face_landmarks" field that contains the
      face landmarks on each detected face.



In [None]:
for i in dir(mp_face_mesh):
  print(f"{i} - {getattr(mp_face_mesh, i)}")

FACEMESH_CONTOURS - frozenset({(270, 409), (176, 149), (37, 0), (84, 17), (318, 324), (293, 334), (386, 385), (7, 163), (33, 246), (17, 314), (374, 380), (251, 389), (390, 373), (267, 269), (295, 285), (389, 356), (173, 133), (33, 7), (377, 152), (158, 157), (405, 321), (54, 103), (263, 466), (324, 308), (67, 109), (409, 291), (157, 173), (454, 323), (388, 387), (78, 191), (148, 176), (311, 310), (39, 37), (249, 390), (144, 145), (402, 318), (80, 81), (310, 415), (153, 154), (384, 398), (397, 365), (234, 127), (103, 67), (282, 295), (338, 297), (378, 400), (127, 162), (321, 375), (375, 291), (317, 402), (81, 82), (154, 155), (91, 181), (334, 296), (297, 332), (269, 270), (150, 136), (109, 10), (356, 454), (58, 132), (312, 311), (152, 148), (415, 308), (161, 160), (296, 336), (65, 55), (61, 146), (78, 95), (380, 381), (398, 362), (361, 288), (246, 161), (162, 21), (0, 267), (82, 13), (132, 93), (314, 405), (10, 338), (178, 87), (387, 386), (381, 382), (70, 63), (61, 185), (14, 317), (10