In [None]:
# Installation and Setup
import subprocess
import sys
import os

def install_package(package):
    subprocess.check_call([sys.executable, "-m", "pip", "install", package])

# Install required packages
try:
    import mmpose
    print("MMPose already installed")
except ImportError:
    print("Installing MMPose and dependencies...")
    install_package("openmim")
    subprocess.check_call([sys.executable, "-m", "mim", "install", "mmengine"])
    subprocess.check_call([sys.executable, "-m", "mim", "install", "mmcv"])
    subprocess.check_call([sys.executable, "-m", "mim", "install", "mmdet"])
    subprocess.check_call([sys.executable, "-m", "mim", "install", "mmpose"])

# Install additional dependencies
required_packages = [
    "opencv-python",
    "matplotlib",
    "numpy",
    "torch",
    "torchvision",
    "Pillow",
    "imageio",
    "scipy"
]

for package in required_packages:
    try:
        __import__(package.replace("-", "_"))
    except ImportError:
        print(f"Installing {package}...")
        install_package(package)

print("All packages installed successfully!")


In [None]:
# Import necessary libraries
import cv2
import numpy as np
import matplotlib.pyplot as plt
import torch
import os
import time
from pathlib import Path
import json

# MMPose imports
from mmpose.apis import MMPoseInferencer
from mmpose.apis import inference_topdown
from mmpose.apis import init_model as init_pose_estimator
from mmpose.utils import register_all_modules

# Register all modules
register_all_modules()

print("Libraries imported successfully!")
print(f"PyTorch version: {torch.__version__}")
print(f"CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")


In [None]:
# Model Configuration for RTMWD (RTM-Pose WholeBody Dense)
class RTMWDConfig:
    def __init__(self):
        # RTM-Pose WholeBody Dense configuration
        self.pose2d_model = 'rtmpose-l_8xb32-270e_coco-wholebody-384x288'
        self.pose3d_model = 'simcc3d_res50_8xb32-270e_h36m-256x192'
        
        # Device configuration
        self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
        
        # Visualization settings
        self.show_keypoint_weight = True
        self.skeleton_style = 'mmpose'
        self.draw_bbox = True
        self.draw_heatmap = False
        
        # Video settings
        self.fps = 30
        self.output_fps = 30
        
        # 3D pose settings
        self.use_multi_frames = True
        self.online = True
        
    def __str__(self):
        return f"""RTMWD Configuration:
        - 2D Pose Model: {self.pose2d_model}
        - 3D Pose Model: {self.pose3d_model}
        - Device: {self.device}
        - FPS: {self.fps}
        - Multi-frame: {self.use_multi_frames}
        """

config = RTMWDConfig()
print(config)


In [None]:
# Initialize RTMWD Models
class RTMWDInferencer:
    def __init__(self, config):
        self.config = config
        print("Initializing RTMWD models...")
        
        # Initialize the MMPose inferencer with wholebody pose detection
        self.inferencer = MMPoseInferencer(
            pose2d=config.pose2d_model,
            pose3d=config.pose3d_model,
            device=config.device,
            show_progress=False
        )
        
        print("✓ Models loaded successfully!")
        
    def predict_pose_3d(self, image):
        """
        Predict 3D pose from a single image
        """
        try:
            # Run inference
            results = self.inferencer(image, show=False, return_vis=True)
            return results
        except Exception as e:
            print(f"Error in pose prediction: {e}")
            return None
    
    def visualize_results(self, image, results, show_3d=True):
        """
        Visualize pose estimation results
        """
        if results is None:
            return image
            
        try:
            # Get visualization from results
            vis_image = results.get('visualization', [image])[0]
            return vis_image
        except Exception as e:
            print(f"Error in visualization: {e}")
            return image

# Initialize the inferencer
try:
    rtmwd = RTMWDInferencer(config)
    print("RTMWD Inferencer initialized successfully!")
except Exception as e:
    print(f"Error initializing RTMWD: {e}")
    print("Please check your MMPose installation and model configurations.")


In [None]:
# Real-time Video Processing Functions
class VideoProcessor:
    def __init__(self, rtmwd_inferencer):
        self.rtmwd = rtmwd_inferencer
        self.frame_buffer = []
        self.pose_history = []
        
    def process_webcam(self, duration=30):
        """Process real-time webcam feed for pose estimation"""
        cap = cv2.VideoCapture(0)
        if not cap.isOpened():
            print("Error: Cannot open webcam")
            return
            
        print(f"Starting webcam processing for {duration} seconds...")
        print("Press 'q' to quit early")
        
        start_time = time.time()
        frame_count = 0
        fps_counter = 0
        fps_start = time.time()
        fps = 0
        
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    break
                
                # Flip frame horizontally for mirror effect
                frame = cv2.flip(frame, 1)
                
                # Process frame with RTMWD
                results = self.rtmwd.predict_pose_3d(frame)
                
                # Visualize results
                vis_frame = self.rtmwd.visualize_results(frame, results)
                
                # Calculate FPS
                fps_counter += 1
                if time.time() - fps_start >= 1.0:
                    fps = fps_counter
                    fps_counter = 0
                    fps_start = time.time()
                    
                # Display FPS on frame
                cv2.putText(vis_frame, f'FPS: {fps}', (10, 30), 
                          cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                
                # Display frame
                cv2.imshow('RTMWD Real-time 3D Pose', vis_frame)
                
                # Check for exit
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break
                    
                # Check duration
                if time.time() - start_time > duration:
                    break
                    
                frame_count += 1
                
        except KeyboardInterrupt:
            print("Stopped by user")
        finally:
            cap.release()
            cv2.destroyAllWindows()
            print(f"Processed {frame_count} frames")

# Create video processor
try:
    video_processor = VideoProcessor(rtmwd)
    print("Video processor initialized!")
except NameError:
    print("Please run the RTMWD initialization cell first")


In [None]:
# Real-time Webcam Processing Example
# UNCOMMENT THE LINE BELOW TO START WEBCAM PROCESSING
# video_processor.process_webcam(duration=30)  # Process for 30 seconds

print("Webcam processing is ready!")
print("Uncomment the line above to start real-time processing.")
print("Make sure your webcam is connected and working.")


In [None]:
# Video File Processing Example
def process_sample_video(video_path, output_path=None):
    """
    Process a video file with RTMWD for 3D pose estimation
    """
    if not os.path.exists(video_path):
        print(f"Video file not found: {video_path}")
        print("Please provide a valid video file path.")
        return None
    
    print(f"Processing video: {video_path}")
    
    # Process the video
    results = video_processor.process_video_file(video_path, output_path)
    
    if results:
        print(f"Successfully processed {len(results)} frames")
        if output_path:
            print(f"Output saved to: {output_path}")
    
    return results

# Example usage (uncomment and modify paths as needed):
# video_path = "path/to/your/input_video.mp4"
# output_path = "output_video_with_poses.mp4"
# results = process_sample_video(video_path, output_path)

print("Video processing function is ready!")
print("Modify the paths above and uncomment to process a video file.")


In [None]:
# Test and Utility Functions
def test_single_image():
    """Test RTMWD on a single image"""
    # Create a test image (you can replace this with loading an actual image)
    test_image = np.zeros((480, 640, 3), dtype=np.uint8)
    test_image.fill(128)  # Gray background
    
    print("Testing RTMWD on a test image...")
    
    try:
        results = rtmwd.predict_pose_3d(test_image)
        vis_image = rtmwd.visualize_results(test_image, results)
        
        print("✓ Single image test successful!")
        
        # Display the result
        plt.figure(figsize=(10, 6))
        plt.imshow(cv2.cvtColor(vis_image, cv2.COLOR_BGR2RGB))
        plt.title("RTMWD Test Result")
        plt.axis('off')
        plt.show()
        
        return True
    except Exception as e:
        print(f"✗ Single image test failed: {e}")
        return False

def check_system_requirements():
    """Check if the system meets the requirements for real-time processing"""
    print("=== System Requirements Check ===")
    
    # Check CUDA
    if torch.cuda.is_available():
        print(f"✓ CUDA available: {torch.cuda.get_device_name(0)}")
        print(f"  Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")
    else:
        print("⚠ CUDA not available - will use CPU (slower)")
    
    # Check webcam
    cap = cv2.VideoCapture(0)
    if cap.isOpened():
        print("✓ Webcam available")
        cap.release()
    else:
        print("⚠ Webcam not detected")
    
    # Check MMPose installation
    try:
        import mmpose
        print(f"✓ MMPose version: {mmpose.__version__}")
    except:
        print("✗ MMPose not properly installed")
    
    print("=== End Check ===")

# Run system check
check_system_requirements()

# Uncomment to run single image test:
# test_single_image()
