# Pose Landmarks Detection with MediaPipe Tasks
This notebook shows you how to use MediaPipe Tasks Python API to detect pose landmarks from images.

In [1]:
import cv2
import mediapipe as mp
import numpy as np
import os
import json
import csv
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

In [None]:
def load_video_jobs(filepath):
    if not os.path.exists(filepath):
        raise FileNotFoundError(f"Config file '{filepath}' not found.")
    
    jobs = []
    
    if filepath.endswith('.json'):
        with open(filepath, 'r', encoding='utf-8') as f:
            jobs = json.load(f)
            
    elif filepath.endswith('.csv'):
        with open(filepath, 'r', encoding='utf-8') as f:
            reader = csv.DictReader(f)
            for row in reader:
                if row.get('input'): 
                    jobs.append(row)
    else:
        raise ValueError("Config file not of type .json or .csv.")
        
    return jobs

MODEL_PATH = 'pose_landmarker.task'

POSE_CONNECTIONS = frozenset([
    (0, 1), (1, 2), (2, 3), (3, 7), (0, 4), (4, 5), (5, 6), (6, 8), (9, 10),
    (11, 12), (11, 13), (13, 15), (15, 17), (15, 19), (15, 21), (17, 19),
    (12, 14), (14, 16), (16, 18), (16, 20), (16, 22), (18, 20), (11, 23),
    (12, 24), (23, 24), (23, 25), (24, 26), (25, 27), (26, 28), (27, 29),
    (28, 30), (29, 31), (30, 32), (27, 31), (28, 32)
])

def extract_features(landmarks):
    """
    Returns: Array with [x1, y1, z1, vis1, x2, y2, ... ] (Size: 33 * 4 = 132)
    """
    if not landmarks:
        return np.zeros(33 * 4)
    
    features = []
    for lm in landmarks:
        features.extend([lm.x, lm.y, lm.z, lm.visibility])
    
    return np.array(features)

def draw_landmarks_on_image(rgb_image, detection_result):
    pose_landmarks_list = detection_result.pose_landmarks
    annotated_image = np.copy(rgb_image)

    for idx in range(len(pose_landmarks_list)):
        pose_landmarks = pose_landmarks_list[idx]
        height, width, _ = annotated_image.shape
        
        for landmark in pose_landmarks:
             cx, cy = int(landmark.x * width), int(landmark.y * height)
             cv2.circle(annotated_image, (cx, cy), 5, (0, 255, 0), -1)

        for connection in POSE_CONNECTIONS:
            start_idx = connection[0]
            end_idx = connection[1]
            start_point = pose_landmarks[start_idx]
            end_point = pose_landmarks[end_idx]
            x1, y1 = int(start_point.x * width), int(start_point.y * height)
            x2, y2 = int(end_point.x * width), int(end_point.y * height)
            cv2.line(annotated_image, (x1, y1), (x2, y2), (255, 255, 255), 2)
    return annotated_image

def process_single_video(job_config, detector):
    input_path = job_config["input"]
    output_path = job_config["output"]

    if not os.path.exists(input_path):
        print(f"WARNING: File '{input_path}' not found. Skipping...")
        return None

    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        print(f"ERROR: Could not open '{input_path}'.")
        return None

    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = cap.get(cv2.CAP_PROP_FPS)
    if fps == 0: fps = 30

    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    
    fourcc = cv2.VideoWriter_fourcc(*'mp4v') 
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_data = []
    frame_index = 0

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        mp_image = mp.Image(image_format=mp.ImageFormat.SRGB, data=rgb_frame)
        timestamp_ms = int((frame_index * 1000) / fps)

        detection_result = detector.detect_for_video(mp_image, timestamp_ms)
        
        current_landmarks = None
        if detection_result.pose_landmarks:
            current_landmarks = detection_result.pose_landmarks[0]
            
            feature_vector = extract_features(current_landmarks)
            frame_data.append(feature_vector)
        else:
            # Frames without person detected
            # ToDo: how to handle missing data? LSTM prefers nulls or last known value.
            pass 

        annotated_frame = draw_landmarks_on_image(rgb_frame, detection_result)
        out.write(cv2.cvtColor(annotated_frame, cv2.COLOR_RGB2BGR))

        frame_index += 1

    cap.release()
    out.release()
    
    print(f"  -> {input_path} fertig. {len(frame_data)} Frames mit Pose extrahiert.")
    
    return {
        "label": job_config.get("label", "unknown"),
        "features": np.array(frame_data),
        "source_file": input_path
    }

def main():
    
    base_options = python.BaseOptions(model_asset_path=MODEL_PATH)
    options = vision.PoseLandmarkerOptions(
        base_options=base_options,
        output_segmentation_masks=False,
        running_mode=vision.RunningMode.VIDEO
    )

    all_training_data = []

    try:
        video_jobs = load_video_jobs('video_jobs.json')
        print(f"Loading of config: {len(video_jobs)} videos to process.")
    except Exception as e:
        print(f"Error loading configuration: {e}")
        return

    try:
        with vision.PoseLandmarker.create_from_options(options) as detector:
            for job in video_jobs:
                result = process_single_video(job, detector)
                
                if result is not None:
                    all_training_data.append(result)
                    
    except Exception as e:
        print(f"Critical error in main: {e}")

    print("\n--- Summary ---")
    print(f"Number of successfully processed videos: {len(all_training_data)}")
    
    for entry in all_training_data:
        print(f"Video: {entry['source_file']} | Label: {entry['label']} | Data shape: {entry['features'].shape}")

if __name__ == "__main__":
    main()

Loading of config: 2 videos to process.
  -> videos/CarvingSkier.mp4 fertig. 60 Frames mit Pose extrahiert.

--- Summary ---
Number of successfully processed videos: 1
Video: videos/CarvingSkier.mp4 | Label: carving_skier | Data shape: (60, 132)
