In [2]:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision
import numpy as np
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np
from PIL import Image
import cv2
import time
import mediapipe as mp

In [14]:
model_path = r'pose_landmarker_heavy.task'


In [4]:
def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image

# Testing with images

In [4]:
# Load the input image from an image file.
mp_image = mp.Image.create_from_file(r'person.png')


In [5]:
mp_image.numpy_view().shape

(573, 433, 4)

In [6]:
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.IMAGE)

with PoseLandmarker.create_from_options(options) as landmarker:
  # The landmarker is initialized. Use it here.
  # ...
  pose_landmarker_result = landmarker.detect(mp_image)
  print(pose_landmarker_result)
    

PoseLandmarkerResult(pose_landmarks=[[NormalizedLandmark(x=0.4420117139816284, y=0.15436691045761108, z=-0.5071224570274353, visibility=0.9999953508377075, presence=0.9999985694885254), NormalizedLandmark(x=0.4532772898674011, y=0.13756179809570312, z=-0.48850584030151367, visibility=0.9999868869781494, presence=0.9999958276748657), NormalizedLandmark(x=0.46256303787231445, y=0.136842280626297, z=-0.4890199601650238, visibility=0.9999853372573853, presence=0.9999960660934448), NormalizedLandmark(x=0.4717201590538025, y=0.1363580822944641, z=-0.4889039397239685, visibility=0.9999887943267822, presence=0.999995231628418), NormalizedLandmark(x=0.4353560507297516, y=0.13897287845611572, z=-0.47310489416122437, visibility=0.9999892711639404, presence=0.9999932050704956), NormalizedLandmark(x=0.43136879801750183, y=0.13936001062393188, z=-0.47354018688201904, visibility=0.9999899864196777, presence=0.9999932050704956), NormalizedLandmark(x=0.4283042848110199, y=0.13957273960113525, z=-0.4737

In [7]:

annotated_image = draw_landmarks_on_image(mp_image.numpy_view()[:,:, :3], pose_landmarker_result)
Image.fromarray(annotated_image)
annotated_image = Image.fromarray(annotated_image)
annotated_image.show()
annotated_image.save('output.jpg')


# Testing with videos

In [17]:
import mediapipe as mp
import cv2

BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode
video_path = r'C:\Users\trh00\OneDrive\Documents\SeniorProject\datasets\fit3d_dataset\test\s02\videos\squat.mp4'

frames = []
# Create a pose landmarker instance with the video mode:
options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.VIDEO)

with PoseLandmarker.create_from_options(options) as landmarker:
  # The landmarker is initialized. Use it here.
  # ...

  # Use OpenCV’s VideoCapture to load the input video.
  video_capture = cv2.VideoCapture(video_path)
  if not video_capture.isOpened():
      print("Error: Could not open video.")
      exit()   
  # Load the frame rate of the video using OpenCV’s CV_CAP_PROP_FPS
  # You’ll need it to calculate the timestamp for each frame.
  frame_rate = video_capture.get(cv2.CAP_PROP_FPS)
  timestamp = 0 
  # Loop through each frame in the video using VideoCapture#read()
  while video_capture.isOpened():
    success, frame = video_capture.read()
    
    if not success:
      break
    mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
    pose_landmarker_result = landmarker.detect_for_video(mp_image, timestamp)
    timestamp += int(1000 / frame_rate)
    # Draw the pose landmarks on the frame.
    annotated_image = draw_landmarks_on_image(frame, pose_landmarker_result)
    frames.append(annotated_image)
    # cv2.imshow('MediaPipe Pose Landmarking', annotated_image) 
    # if cv2.waitKey(5) & 0xFF == 27:
    #   break
  video_capture.release()
  cv2.destroyAllWindows()

#   # specifying output video path 
# output_video = r'output_fullTask.mp4'
# fourcc = cv2.VideoWriter_fourcc(*'XVID')
# fps = 50
# video_writer = cv2.VideoWriter(output_video, fourcc, fps, (annotated_image.shape[1], annotated_image.shape[0]))
# for frame in frames:
#     video_writer.write(frame)
# video_writer.release()
# print('Output video is ready at', output_video)

  
  
      

In [9]:
video_capture.get(cv2.CAP_PROP_FPS)

0.0

# Testing with webcam

In [None]:
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
PoseLandmarkerResult = mp.tasks.vision.PoseLandmarkerResult
VisionRunningMode = mp.tasks.vision.RunningMode

# Create a pose landmarker instance with the live stream mode:
def print_result(result: PoseLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
    print('pose landmarker result: {}'.format(result))

options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.LIVE_STREAM,
    result_callback=print_result)

with PoseLandmarker.create_from_options(options) as landmarker:
    # Open the webcam stream (0 for the default camera)
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("Error: Could not open webcam.")
        exit()

    # Record the start time to compute frame timestamps.
    start_time = time.time()

    while True:
        success, frame = cap.read()
        if not success:
            print("Failed to capture frame from webcam.")
            break

        # Optional: Flip the frame horizontally for a mirror view.
        frame = cv2.flip(frame, 1)

        # Convert the frame from BGR (OpenCV default) to RGB (expected by MediaPipe)
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Convert the frame to MediaPipe's Image format.
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame_rgb)
        
        # Compute the current frame's timestamp in milliseconds.
        frame_timestamp_ms = int((time.time() - start_time) * 1000)

        # Asynchronously process the frame. The results will be available via print_result.
        landmarker.detect_async(mp_image, frame_timestamp_ms)
        
        # Display the frame (optional)
        cv2.imshow('Live Stream', frame)
        if cv2.waitKey(5) & 0xFF == 27:  # Exit loop when ESC key is pressed
            break

    cap.release()
    cv2.destroyAllWindows()