In [None]:
import cv2
import numpy as np
import mediapipe as mp
from mediapipe import solutions
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
from mediapipe.framework.formats import landmark_pb2

In [None]:
def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

In [None]:
#Creating the Landmark object
base_options = python.BaseOptions(model_asset_path='../models/pose_landmarker_heavy.task')
VisionRunningMode = mp.tasks.vision.RunningMode
options = vision.PoseLandmarkerOptions(
    base_options=base_options,
    output_segmentation_masks=True,
    running_mode=VisionRunningMode.VIDEO)
detector = vision.PoseLandmarker.create_from_options(options)

In [None]:
VIDEO_PATH    = "../data/raw/video0.mp4"
OUTPUT_PATH   = "../data/processed/video0_annotated.mp4"

cap = cv2.VideoCapture(VIDEO_PATH)
if not cap.isOpened():
    raise RuntimeError(f"Could not open video file: {VIDEO_PATH}")

fps    = cap.get(cv2.CAP_PROP_FPS) or 30.0
width  = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

fourcc = cv2.VideoWriter_fourcc(*'mp4v')              # or 'avc1', 'XVID', etc.
out    = cv2.VideoWriter(OUTPUT_PATH, fourcc, fps, (width, height))


frame_idx = 0
while True:
    success, frame_bgr = cap.read()
    if not success:
        break

    frame_rgb  = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2RGB)
    mp_image   = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)
    timestamp_ms = int(frame_idx * (1000.0 / fps))
    result     = detector.detect_for_video(mp_image, timestamp_ms)

    annotated_rgb = draw_landmarks_on_image(frame_rgb, result)
    annotated_bgr = cv2.cvtColor(annotated_rgb, cv2.COLOR_RGB2BGR)


    out.write(annotated_bgr)
    frame_idx += 1

cap.release()
out.release()
cv2.destroyAllWindows()

print(f"Saved annotated video to {OUTPUT_PATH}")