In [None]:
import cv2
import pandas as pd
from tqdm import tqdm

In [None]:
import mediapipe as mp


In [3]:
model_path = '../data/models/pose_landmarker_heavy.task'

In [4]:
VIDEO_INPUT_PATH = "../data/3196221-uhd_3840_2160_25fps.mp4"
VIDEO_OUTPUT_PATH = "../data/pose_output2.mp4"
CSV_OUTPUT_PATH = "../data/pose_metrics2.csv"

In [5]:
BaseOptions = mp.tasks.BaseOptions
PoseLandmarker = mp.tasks.vision.PoseLandmarker
PoseLandmarkerOptions = mp.tasks.vision.PoseLandmarkerOptions
VisionRunningMode = mp.tasks.vision.RunningMode

In [6]:
options = PoseLandmarkerOptions(
    base_options=BaseOptions(model_asset_path=model_path),
    running_mode=VisionRunningMode.VIDEO,
    num_poses=2,
    min_pose_detection_confidence=0.8,
    min_tracking_confidence=0.8,
    min_pose_presence_confidence=0.8,
    output_segmentation_masks=False,
)

In [7]:
def draw_landmarks_on_image(rgb_image, detection_result):
    if not detection_result.pose_landmarks:
        return rgb_image
    
    annotated_image = rgb_image.copy()
    height, width, _ = annotated_image.shape
    
    POSE_CONNECTIONS = [
        (0, 1), (1, 2), (2, 3), (3, 7), (0, 4), (4, 5), (5, 6), (6, 8),
        (9, 10), (11, 12), (11, 13), (13, 15), (15, 17), (15, 19), (15, 21),
        (17, 19), (12, 14), (14, 16), (16, 18), (16, 20), (16, 22), (18, 20),
        (11, 23), (12, 24), (23, 24), (23, 25), (24, 26), (25, 27), (26, 28),
        (27, 29), (28, 30), (29, 31), (30, 32), (27, 31), (28, 32)
    ]
    
    for pose_landmarks in detection_result.pose_landmarks:
        # Convert normalized coordinates to pixel coordinates
        landmark_points = []
        for landmark in pose_landmarks:
            x = int(landmark.x * width)
            y = int(landmark.y * height)
            landmark_points.append((x, y))
        
        # Draw connections (lines between landmarks)
        for connection in POSE_CONNECTIONS:
            start_idx, end_idx = connection
            if start_idx < len(landmark_points) and end_idx < len(landmark_points):
                start_point = landmark_points[start_idx]
                end_point = landmark_points[end_idx]
                cv2.line(annotated_image, start_point, end_point, (0, 255, 0), 2)
        
        # Draw landmarks (circles)
        for point in landmark_points:
            cv2.circle(annotated_image, point, 5, (0, 0, 255), -1)
    
    return annotated_image



In [8]:
with PoseLandmarker.create_from_options(options) as landmarker:
    cap = cv2.VideoCapture(VIDEO_INPUT_PATH)
    
    if not cap.isOpened():
        print(f"Error: Could not open video file {VIDEO_INPUT_PATH}")
        exit()
    
    fps = cap.get(cv2.CAP_PROP_FPS)
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video properties: {frame_width}x{frame_height} @ {fps} FPS, {total_frames} frames")
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(VIDEO_OUTPUT_PATH, fourcc, fps, (frame_width, frame_height))
    
    pose_data = []
    
    frame_count = 0
    with tqdm(total=total_frames, desc="Processing") as pbar:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            # Calculate timestamp in milliseconds
            frame_timestamp_ms = int(frame_count * 1000 / fps)
            rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            
            mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
            pose_landmarker_result = landmarker.detect_for_video(mp_image, frame_timestamp_ms)
            annotated_image = draw_landmarks_on_image(rgb_frame, pose_landmarker_result)
            annotated_bgr = cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR)
            out.write(annotated_bgr)
            if pose_landmarker_result.pose_landmarks:
                for person_idx, pose_landmarks in enumerate(pose_landmarker_result.pose_landmarks):
                    frame_data = {
                        'frame': frame_count,
                        'timestamp_ms': frame_timestamp_ms,
                        'person_id': person_idx
                    }
                    for landmark_idx, landmark in enumerate(pose_landmarks):
                        frame_data[f'landmark_{landmark_idx}_x'] = landmark.x
                        frame_data[f'landmark_{landmark_idx}_y'] = landmark.y
                        frame_data[f'landmark_{landmark_idx}_z'] = landmark.z
                        frame_data[f'landmark_{landmark_idx}_visibility'] = landmark.visibility
                    
                    pose_data.append(frame_data)
            
            frame_count += 1
            pbar.update(1)
    
    # Release video resources
    cap.release()
    out.release()
    
    # Save pose data to CSV
    if pose_data:
        df = pd.DataFrame(pose_data)
        df.to_csv(CSV_OUTPUT_PATH, index=False)
        print(f"\nPose metrics saved to: {CSV_OUTPUT_PATH}")
        print(f"Total records: {len(df)}")
    else:
        print("\nNo pose landmarks detected in the video.")
    
    print(f"Annotated video saved to: {VIDEO_OUTPUT_PATH}")
    print("Processing complete!")

Video properties: 2560x1440 @ 25.0 FPS, 385 frames


Processing: 100%|██████████| 385/385 [01:27<00:00,  4.38it/s]



Pose metrics saved to: ../data/pose_metrics2.csv
Total records: 682
Annotated video saved to: ../data/pose_output2.mp4
Processing complete!
