## Preprocess FS Jump3D
- 2D data from DWposeDetector
- 3D data from json file formatted to h36m

In [1]:
import cv2
import numpy as np
import sys
import os
import glob
sys.path.append(os.path.dirname(os.getcwd()))

from tqdm import tqdm
from dwpose.scripts.dwpose import DWposeDetector
from dwpose.scripts.tool import read_frames
from PIL import Image
import torch
import warnings
from ultralytics import YOLO

from pathlib import Path
warnings.filterwarnings('ignore')

In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
detector = DWposeDetector(
    det_config = "D:\\github\\skating-ai\\v3\\pose\\dwpose\\config\\yolox_l_8xb8-300e_coco.py",
    # det_ckpt = args.yolox_ckpt,
    pose_config = "D:\\github\\skating-ai\\v3\\pose\\dwpose\\config\\dwpose-l_384x288.py",
    # pose_ckpt = args.dwpose_ckpt, 
    keypoints_only=True
    )    
detector = detector.to(device)

Loads checkpoint by http backend from path: https://download.openmmlab.com/mmdetection/v2.0/yolox/yolox_l_8x8_300e_coco/yolox_l_8x8_300e_coco_20211126_140236-d3bd2b23.pth
Loads checkpoint by http backend from path: https://huggingface.co/wanghaofan/dw-ll_ucoco_384/resolve/main/dw-ll_ucoco_384.pth


In [3]:
import numpy as np
from ultralytics import YOLO

def estimate2d(video_path, detector, yolo_model_path='yolov8n.pt'):
    # Load YOLO model - force CPU to avoid CUDA issues
    yolo_model = YOLO(yolo_model_path)
    yolo_model.to('cpu')  # Force CPU inference
    
    frames = read_frames(video_path)
    kpts2d = []
    score2d = []
    
    for idx, frame in enumerate(frames):
        # YOLO detection - find largest person (CPU inference)
        yolo_results = yolo_model(frame, classes=[0], verbose=False, device='cpu')
        
        # Pose detection
        pose = detector(frame)
        candidate = pose["bodies"]["candidate"]
        subset = pose["bodies"]["subset"]
        
        num_person = subset.shape[0]
        num_joints = subset.shape[1]
        
        if num_person == 0:
            # No person detected
            kpts2d.append(np.zeros((18, 2)))
            score2d.append(np.zeros(18))
            continue
        
        keypoint = candidate.reshape(num_person, num_joints, 2)
        
        # Find index of person with largest YOLO bbox
        person_idx = -1  # default
        if len(yolo_results[0].boxes) > 0:
            # Get largest bbox by area
            largest_area = 0
            for i, box in enumerate(yolo_results[0].boxes):
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                area = (x2 - x1) * (y2 - y1)
                if area > largest_area:
                    largest_area = area
                    person_idx = min(i, num_person - 1)  # ensure valid index
        
        # Use largest person or fallback to last person
        if person_idx >= 0:
            selected_keypoints = keypoint[person_idx][1:]
            selected_scores = subset[person_idx][1:]
        else:
            selected_keypoints = keypoint[-1][1:]
            selected_scores = subset[-1][1:]
        
        kpts2d.append(selected_keypoints)
        score2d.append(selected_scores)
    
    kpts2d = np.array(kpts2d)
    score2d = np.expand_dims(np.array(score2d), axis=-1)
    keypoints = np.concatenate([kpts2d, score2d], axis=-1)
    
    return keypoints

In [4]:
def process_video(video_path):
    """Process a single video file"""
    try:
        path_parts = Path(video_path).parts
        skater = path_parts[-3].lower()
        camera = path_parts[-2].lower()
        filename = Path(video_path).stem.lower()
        output_name = f"{skater}_{camera}_{filename}_2D.npy"
        
        output_path = os.path.join(output_dir, output_name)
        
        if not os.path.exists(output_path):
            keypoints = estimate2d(video_path, detector)
            np.save(output_path, keypoints)
            # Create animation
            # anim = FuncAnimation(fig, animate, frames=300, interval=100, repeat=True)
            
            # To save as gif (optional)
            # anim.save(f"{skater}_{camera}_{filename}.gif", writer='pillow', fps=10)
    except Exception as e:
        print(f"✗ Error processing {video_path}: {str(e)}")    

In [5]:
video_paths = glob.glob("D:\\github\\FS-Jump3D\\data\\**\\*.mp4", recursive=True)
output_dir = "D:\\github\\MotionAGFormer\\data\\keypoints"
os.makedirs(output_dir, exist_ok=True)

In [6]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation

def animate(frame_num, keypoints):
    ax.clear()
    
    # Get current frame data
    frame = keypoints[frame_num]
    x = frame[:, 0]
    y = frame[:, 1]
    
    # Plot keypoints
    ax.scatter(x, y, s=100, c='red')
    
    ax.invert_yaxis()
    ax.set_title(f'Frame {frame_num}')
    ax.set_xlim(np.min(keypoints[:,:,0]), np.max(keypoints[:,:,0]))
    ax.set_ylim(np.max(keypoints[:,:,1]), np.min(keypoints[:,:,1]))

In [7]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock

max_workers = 4  # Adjust based on your system
    
with ThreadPoolExecutor(max_workers=max_workers) as executor:
    # Use tqdm for progress bar
    futures = [executor.submit(process_video, video_path) for video_path in video_paths]
    
    for future in tqdm(as_completed(futures), total=len(video_paths), desc="Processing videos"):
        result = future.result()

Processing videos: 100%|██████████████████████████████████████████████████████████████████████████████████████████████| 3036/3036 [15:11:41<00:00, 18.02s/it]
