In [5]:
import torch
import requests
import numpy as np

from PIL import Image

from transformers import (
    AutoProcessor,
    RTDetrForObjectDetection,
    VitPoseForPoseEstimation,
)

device = "cuda" if torch.cuda.is_available() else "cpu"

In [6]:
# ------------------------------------------------------------------------
# Stage 1. Detect humans on the image
# ------------------------------------------------------------------------

# You can choose detector by your choice
person_image_processor = AutoProcessor.from_pretrained("PekingU/rtdetr_r50vd_coco_o365")
person_model = RTDetrForObjectDetection.from_pretrained("PekingU/rtdetr_r50vd_coco_o365", device_map=device)

In [7]:
# ------------------------------------------------------------------------
# Stage 2. Detect keypoints for each person found
# ------------------------------------------------------------------------

image_processor = AutoProcessor.from_pretrained("usyd-community/vitpose-base-simple")
model = VitPoseForPoseEstimation.from_pretrained("usyd-community/vitpose-base-simple", device_map=device)

In [8]:
import cv2
import os
from tqdm import tqdm
import matplotlib.pyplot as plt

def process_video_pose_estimation(input_video_path, output_video_path, detection_threshold=0.3, pose_threshold=0.3):
    """
    Process a video file for pose estimation and overlay keypoints on each frame
    
    Args:
        input_video_path: Path to input video file
        output_video_path: Path to save output video
        detection_threshold: Confidence threshold for person detection
        pose_threshold: Confidence threshold for pose keypoints
    """
    # Open video file
    cap = cv2.VideoCapture(input_video_path)
    
    if not cap.isOpened():
        print(f"Error: Could not open video file {input_video_path}")
        return
    
    # Get video properties
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    print(f"Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")
    
    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
    
    # Define colors for different people (BGR format for OpenCV)
    colors = [
        (0, 0, 255),      # Red
        (255, 0, 0),      # Blue
        (0, 255, 0),      # Green
        (0, 255, 255),    # Yellow
        (255, 0, 255),    # Magenta
        (255, 165, 0),    # Orange
        (255, 192, 203),  # Pink
        (165, 42, 42)     # Brown
    ]
    
    # Define skeleton connections (COCO format)
    skeleton = [
        [16, 14], [14, 12], [17, 15], [15, 13], [12, 13],  # legs
        [6, 12], [7, 13], [6, 7], [6, 8], [7, 9],         # torso and arms
        [8, 10], [9, 11], [2, 3], [1, 2], [1, 3],         # arms and head
        [2, 4], [3, 5], [4, 6], [5, 7]                    # head to shoulders
    ]
    
    frame_count = 0
    
    with tqdm(total=total_frames, desc="Processing frames") as pbar:
        while True:
            ret, frame = cap.read()
            if not ret:
                break
                
            frame_count += 1
            
            # Convert BGR to RGB for PIL
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            pil_image = Image.fromarray(frame_rgb)
            
            # Stage 1: Detect humans
            inputs = person_image_processor(images=pil_image, return_tensors="pt").to(device)
            
            with torch.no_grad():
                outputs = person_model(**inputs)
            
            results = person_image_processor.post_process_object_detection(
                outputs, target_sizes=torch.tensor([(pil_image.height, pil_image.width)]), 
                threshold=detection_threshold
            )
            result = results[0]
            
            # Get person boxes
            person_boxes = result["boxes"][result["labels"] == 0]
            
            if len(person_boxes) > 0:
                person_boxes = person_boxes.cpu().numpy()
                
                # Convert boxes from VOC (x1, y1, x2, y2) to COCO (x1, y1, w, h) format
                person_boxes[:, 2] = person_boxes[:, 2] - person_boxes[:, 0]
                person_boxes[:, 3] = person_boxes[:, 3] - person_boxes[:, 1]
                
                # Stage 2: Detect keypoints
                inputs = image_processor(pil_image, boxes=[person_boxes], return_tensors="pt").to(device)
                
                with torch.no_grad():
                    outputs = model(**inputs)
                
                pose_results = image_processor.post_process_pose_estimation(
                    outputs, boxes=[person_boxes], threshold=pose_threshold
                )
                image_pose_result = pose_results[0]
                
                # Draw poses on frame
                frame = draw_poses_on_frame(frame, image_pose_result, person_boxes, 
                                          colors, skeleton, pose_threshold)
            
            # Write frame to output video
            out.write(frame)
            pbar.update(1)
    
    # Release everything
    cap.release()
    out.release()
    
    print(f"Video processing complete! Output saved to: {output_video_path}")

def draw_poses_on_frame(frame, pose_results, boxes, colors, skeleton, min_score=0.3):
    """
    Draw pose keypoints and skeleton on a frame using OpenCV
    """
    # Draw bounding boxes and keypoints for each person
    for i, (person_pose, box) in enumerate(zip(pose_results, boxes)):
        color = colors[i % len(colors)]
        
        # Draw bounding box
        x, y, w, h = box.astype(int)
        cv2.rectangle(frame, (x, y), (x + w, y + h), color, 2)
        
        # Extract keypoints, labels, and scores
        keypoints = person_pose["keypoints"]
        labels = person_pose["labels"]
        scores = person_pose["scores"]
        
        # Create a mapping from label to keypoint for easy access
        keypoint_dict = {}
        for keypoint, label, score in zip(keypoints, labels, scores):
            if score >= min_score:
                keypoint_dict[label.item()] = (int(keypoint[0].item()), int(keypoint[1].item()), score.item())
        
        # Draw skeleton connections
        for connection in skeleton:
            start_idx, end_idx = connection
            if start_idx in keypoint_dict and end_idx in keypoint_dict:
                start_x, start_y, _ = keypoint_dict[start_idx]
                end_x, end_y, _ = keypoint_dict[end_idx]
                cv2.line(frame, (start_x, start_y), (end_x, end_y), color, 2)
        
        # Draw keypoints
        for keypoint, label, score in zip(keypoints, labels, scores):
            if score >= min_score:
                x, y = int(keypoint[0].item()), int(keypoint[1].item())
                # Size based on confidence score
                radius = max(3, int(5 + (score.item() * 5)))
                cv2.circle(frame, (x, y), radius, color, -1)
                cv2.circle(frame, (x, y), radius, (255, 255, 255), 1)  # White border
                
                # Optionally add keypoint labels
                keypoint_name = model.config.id2label[label.item()]
                cv2.putText(frame, keypoint_name, (x + 5, y - 5), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.3, (255, 255, 255), 1)
    
    return frame


In [9]:
# Example usage for video processing
# Make sure to install required dependencies first:
# pip install opencv-python tqdm matplotlib

# Example 1: Process a video file
input_video = "input_video.mp4"  # Replace with your video file path
output_video = "output_video_with_poses.mp4"

# Check if input video exists
if os.path.exists(input_video):
    # Process the video
    process_video_pose_estimation(
        input_video_path=input_video,
        output_video_path=output_video,
        detection_threshold=0.3,  # Confidence threshold for person detection
        pose_threshold=0.3        # Confidence threshold for pose keypoints
    )
else:
    print(f"Video file {input_video} not found. Please provide a valid video file path.")
    
# Example 2: Process a video from webcam (live processing)
# Uncomment the following lines to process webcam feed:
# process_video_pose_estimation(
#     input_video_path=0,  # 0 for default webcam
#     output_video_path="webcam_poses.mp4",
#     detection_threshold=0.3,
#     pose_threshold=0.3
# )


Video properties: 1620x1080, 30 FPS, 317 frames


Processing frames:   0%|          | 0/317 [00:00<?, ?it/s]

Processing frames: 100%|██████████| 317/317 [00:39<00:00,  7.97it/s]

Video processing complete! Output saved to: output_video_with_poses.mp4





In [None]:
def real_time_pose_estimation(camera_id=0, detection_threshold=0.3, pose_threshold=0.3):
    """
    Real-time pose estimation from webcam feed with live display
    Press 'q' to quit
    """
    cap = cv2.VideoCapture(camera_id)
    
    if not cap.isOpened():
        print(f"Error: Could not open camera {camera_id}")
        return
    
    # Define colors for different people (BGR format for OpenCV)
    colors = [
        (0, 0, 255),      # Red
        (255, 0, 0),      # Blue
        (0, 255, 0),      # Green
        (0, 255, 255),    # Yellow
        (255, 0, 255),    # Magenta
        (255, 165, 0),    # Orange
        (255, 192, 203),  # Pink
        (165, 42, 42)     # Brown
    ]
    
    # Define skeleton connections (COCO format)
    skeleton = [
        [16, 14], [14, 12], [17, 15], [15, 13], [12, 13],  # legs
        [6, 12], [7, 13], [6, 7], [6, 8], [7, 9],         # torso and arms
        [8, 10], [9, 11], [2, 3], [1, 2], [1, 3],         # arms and head
        [2, 4], [3, 5], [4, 6], [5, 7]                    # head to shoulders
    ]
    
    print("Real-time pose estimation started. Press 'q' to quit.")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Convert BGR to RGB for PIL
        frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        pil_image = Image.fromarray(frame_rgb)
        
        # Stage 1: Detect humans
        inputs = person_image_processor(images=pil_image, return_tensors="pt").to(device)
        
        with torch.no_grad():
            outputs = person_model(**inputs)
        
        results = person_image_processor.post_process_object_detection(
            outputs, target_sizes=torch.tensor([(pil_image.height, pil_image.width)]), 
            threshold=detection_threshold
        )
        result = results[0]
        
        # Get person boxes
        person_boxes = result["boxes"][result["labels"] == 0]
        
        if len(person_boxes) > 0:
            person_boxes = person_boxes.cpu().numpy()
            
            # Convert boxes from VOC (x1, y1, x2, y2) to COCO (x1, y1, w, h) format
            person_boxes[:, 2] = person_boxes[:, 2] - person_boxes[:, 0]
            person_boxes[:, 3] = person_boxes[:, 3] - person_boxes[:, 1]
            
            # Stage 2: Detect keypoints
            inputs = image_processor(pil_image, boxes=[person_boxes], return_tensors="pt").to(device)
            
            with torch.no_grad():
                outputs = model(**inputs)
            
            pose_results = image_processor.post_process_pose_estimation(
                outputs, boxes=[person_boxes], threshold=pose_threshold
            )
            image_pose_result = pose_results[0]
            
            # Draw poses on frame
            frame = draw_poses_on_frame(frame, image_pose_result, person_boxes, 
                                      colors, skeleton, pose_threshold)
        
        # Display the frame
        cv2.imshow('Real-time Pose Estimation', frame)
        
        # Break the loop if 'q' is pressed
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    # Release everything
    cap.release()
    cv2.destroyAllWindows()
    print("Real-time pose estimation stopped.")

# Example usage for real-time processing:
# Uncomment the following line to start real-time pose estimation
# real_time_pose_estimation(camera_id=0, detection_threshold=0.3, pose_threshold=0.3)


Error: Could not open camera 0


[ WARN:0@53.655] global cap_v4l.cpp:913 open VIDEOIO(V4L2:/dev/video0): can't open camera by index
[ERROR:0@53.655] global obsensor_uvc_stream_channel.cpp:158 getStreamChannelGroup Camera index out of range
