### Preparations

In [1]:
%pip install -q mediapipe opencv-python

Note: you may need to restart the kernel to use updated packages.


### Importing Libraries

In [2]:
import cv2
from time import perf_counter
import mediapipe as mp
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np
import os

### Configuration and Initialization

In [3]:
model_path = os.path.join(os.getcwd(), 'pose_landmarker_full.task')

BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
PoseLandmarkerResult = mp.tasks.vision.PoseLandmarkerResult
VisionRunningMode = mp.tasks.vision.RunningMode
mp_drawing = mp.solutions.drawing_utils

### Helper Functions

#### Calculate Angle

In [4]:
def calculate_angle(a, b, c):
    a = np.array(a)  # First
    b = np.array(b)  # Mid
    c = np.array(c)  # End
    
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    
    if angle > 180.0:
        angle = 360 - angle
    
    return angle


#### Draw Landmarks on Image

In [5]:
def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)
    
    # Loop through the detected poses to visualize.
    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]
        
        # Draw the pose landmarks.
        pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
        pose_landmarks_proto.landmark.extend([
            landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
        ])
        solutions.drawing_utils.draw_landmarks(
            annotated_image,
            pose_landmarks_proto,
            solutions.pose.POSE_CONNECTIONS,
            solutions.drawing_styles.get_default_pose_landmarks_style())
    return annotated_image


#### Print Result Callback

In [6]:
annotated_image = None

def print_result(result: PoseLandmarkerResult, output_image: mp.Image, timestamp_ms: int):
    global annotated_image
    annotated_image = draw_landmarks_on_image(output_image.numpy_view(), result)


### Main Script

#### Curl Counter Variables

In [7]:
counter = 0
stage = None


#### Pose Landmarker Initialization

In [8]:
options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.LIVE_STREAM,
    result_callback=print_result
)

#### Setup and Run Pose Estimator

In [9]:
start_time = perf_counter()

# Setup MediaPipe instance
cap = cv2.VideoCapture(0)
with PoseLandmarker.create_from_options(options) as landmarker:
    while cap.isOpened():
        ret, frame = cap.read()

        # Convert the frame received from OpenCV to a MediaPipe’s Image object
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)

        # Get timestamp
        elapsed_time_ms = int((perf_counter() - start_time) * 1000)

        # Make detection
        landmarker.detect_async(mp_image, elapsed_time_ms)

        # Display the annotated image if available
        if annotated_image is not None:
            cv2.imshow('Mediapipe Feed', annotated_image)
        else:
            cv2.imshow('Mediapipe Feed', frame)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()


I0000 00:00:1717181341.995007 20323389 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 88), renderer: Apple M2
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1717181342.071740 20323642 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1717181342.078245 20323644 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


KeyboardInterrupt: 