In [4]:
import os
import random
import cv2
import numpy as np
import torch
import torchvision 
from ultralytics import YOLO
from tqdm import tqdm


# --- Configuration ---
DATASET_DIR = "Dataset"  # Path to the dataset directory
OUTPUT_VIDEO_PATH = "keypoint_visualization_yolo.mp4"
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- Load YOLOv11-Pose Model ---
try:
    print("Loading YOLOv11-pose model...")
    # Using 'yolov8n-pose.pt' as it's a standard small model. 
    # Replace with 'yolo11n-pose.pt' if you have that specific file.
    model = YOLO("yolo11n-pose.pt") 
    model.to(DEVICE)
    print(f"YOLO model loaded successfully on device: {DEVICE}")
except Exception as e:
    print(f"Error loading YOLO model: {e}")
    exit()

# --- Keypoint Drawing Utilities (No changes needed here) ---
KEYPOINT_DICT = {
    'nose': 0, 'left_eye': 1, 'right_eye': 2, 'left_ear': 3, 'right_ear': 4,
    'left_shoulder': 5, 'right_shoulder': 6, 'left_elbow': 7, 'right_elbow': 8,
    'left_wrist': 9, 'right_wrist': 10, 'left_hip': 11, 'right_hip': 12,
    'left_knee': 13, 'right_knee': 14, 'left_ankle': 15, 'right_ankle': 16
}
SKELETON_EDGES = [
    (KEYPOINT_DICT['left_shoulder'], KEYPOINT_DICT['right_shoulder']), (KEYPOINT_DICT['left_hip'], KEYPOINT_DICT['right_hip']),
    (KEYPOINT_DICT['left_shoulder'], KEYPOINT_DICT['left_hip']), (KEYPOINT_DICT['right_shoulder'], KEYPOINT_DICT['right_hip']),
    (KEYPOINT_DICT['left_shoulder'], KEYPOINT_DICT['left_elbow']), (KEYPOINT_DICT['left_elbow'], KEYPOINT_DICT['left_wrist']),
    (KEYPOINT_DICT['right_shoulder'], KEYPOINT_DICT['right_elbow']), (KEYPOINT_DICT['right_elbow'], KEYPOINT_DICT['right_wrist']),
    (KEYPOINT_DICT['left_hip'], KEYPOINT_DICT['left_knee']), (KEYPOINT_DICT['left_knee'], KEYPOINT_DICT['left_ankle']),
    (KEYPOINT_DICT['right_hip'], KEYPOINT_DICT['right_knee']), (KEYPOINT_DICT['right_knee'], KEYPOINT_DICT['right_ankle']),
    (KEYPOINT_DICT['nose'], KEYPOINT_DICT['left_eye']), (KEYPOINT_DICT['nose'], KEYPOINT_DICT['right_eye']),
    (KEYPOINT_DICT['left_eye'], KEYPOINT_DICT['left_ear']), (KEYPOINT_DICT['right_eye'], KEYPOINT_DICT['right_ear']),
]

def draw_keypoints(frame, keypoints, confidence_threshold=0.2):
    h, w, _ = frame.shape
    for kp in keypoints:
        y, x, conf = kp
        if conf > confidence_threshold:
            # Note: keypoints are already normalized [y, x], so we multiply by h, w
            cv2.circle(frame, (int(x * w), int(y * h)), 4, (0, 255, 0), -1)

def draw_skeleton(frame, keypoints, confidence_threshold=0.2):
    h, w, _ = frame.shape
    for start_idx, end_idx in SKELETON_EDGES:
        start_kp, end_kp = keypoints[start_idx], keypoints[end_idx]
        if start_kp[2] > confidence_threshold and end_kp[2] > confidence_threshold:
            start_point = (int(start_kp[1] * w), int(start_kp[0] * h))
            end_point = (int(end_kp[1] * w), int(end_kp[0] * h))
            cv2.line(frame, start_point, end_point, (255, 0, 0), 2)

def get_keypoints_from_result(result, frame_h, frame_w):
    """
    Extracts, normalizes, and formats keypoints from a YOLO result.
    Handles multi-person detection by picking the most confident person.
    
    Returns:
        np.ndarray: A NumPy array of shape (17, 3) with normalized [y, x, confidence].
    """
    if result.keypoints is None or len(result.keypoints.data) == 0:
        return np.zeros((17, 3), dtype=np.float32)

    kpts_tensor = result.keypoints.data
    
    # --- Handle multiple people: select the one with the highest avg confidence ---
    if kpts_tensor.shape[0] > 1:
        confidences = kpts_tensor[:, :, 2].mean(dim=1)
        best_person_idx = confidences.argmax()
        person_kpts = kpts_tensor[best_person_idx]
    else:
        person_kpts = kpts_tensor[0]

    # Convert to numpy and normalize
    person_kpts_np = person_kpts.cpu().numpy()
    
    # Format to [y, x, confidence] and normalize
    formatted_kps = np.zeros((17, 3), dtype=np.float32)
    for i in range(17):
        x, y, conf = person_kpts_np[i]
        formatted_kps[i] = [y / frame_h, x / frame_w, conf]
        
    return formatted_kps

def process_and_visualize_video(video_path: str, output_path: str, yolo_model):
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video {video_path}")
        return

    frame_width, frame_height = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps, num_frames = int(cap.get(cv2.CAP_PROP_FPS)), int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

    print(f"Processing video: {os.path.basename(video_path)}")
    for _ in tqdm(range(num_frames), desc="Visualizing with YOLO"):
        ret, frame = cap.read()
        if not ret: break

        # --- Run YOLO Inference ---
        # The model expects BGR frames, which cv2.read() provides
        results = yolo_model(frame, verbose=False)
        
        # --- Extract, normalize, and format keypoints ---
        # We process the first (and likely only) result
        keypoints = get_keypoints_from_result(results[0], frame_height, frame_width)

        # --- Drawing on the frame (this happens on CPU) ---
        vis_frame = frame.copy()
        draw_keypoints(vis_frame, keypoints)
        draw_skeleton(vis_frame, keypoints)
        out.write(vis_frame)

    cap.release()
    out.release()
    print(f"\nVisualization complete! Video saved to: {output_path}")

if __name__ == "__main__":
    all_videos = []
    for label_name in ["Fall", "No_Fall"]:
        video_folder = os.path.join(DATASET_DIR, label_name, "Raw_Video")
        if os.path.exists(video_folder):
            all_videos.extend([os.path.join(video_folder, f) for f in os.listdir(video_folder)])

    if not all_videos:
        print("Error: No videos found. Check DATASET_DIR path.")
    else:
        random_video_path = random.choice(all_videos)
        process_and_visualize_video(random_video_path, OUTPUT_VIDEO_PATH, model)

Loading YOLOv11-pose model...
[KDownloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11n-pose.pt to 'yolo11n-pose.pt': 100% ━━━━━━━━━━━━ 6.0MB 13.0MB/s 0.5s.4s<0.0s1s
YOLO model loaded successfully on device: cuda
Processing video: S_N_332_resized.mp4


Visualizing with YOLO: 100%|██████████| 60/60 [00:02<00:00, 27.26it/s]


Visualization complete! Video saved to: keypoint_visualization_yolo.mp4





## run on full dataset

In [5]:
import os
import cv2
import numpy as np
import torch
from ultralytics import YOLO
from tqdm import tqdm

# --- Configuration ---
# The root directory of your raw video dataset
DATASET_DIR = "Dataset" 
# The directory where the processed .npy files will be saved
OUTPUT_DIR = "processed_keypoints_yolo" 
# Use 'yolov8n-pose.pt' for the smallest model, or specify your 'yolo11n-pose.pt'
MODEL_NAME = "yolo11n-pose.pt"
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

# --- 1. Model Loading ---
def load_yolo_model():
    """Loads the YOLO-Pose model."""
    print(f"--- Loading YOLO-Pose Model: {MODEL_NAME} ---")
    try:
        model = YOLO(MODEL_NAME)
        model.to(DEVICE)
        print(f"Model loaded successfully on device: {DEVICE}")
        return model
    except Exception as e:
        print(f"FATAL: Error loading YOLO model: {e}")
        return None

# --- 2. Main Processing Logic ---
def get_main_person_keypoints(result, frame_h, frame_w):
    """
    Extracts, normalizes, and formats keypoints from a YOLO result.
    
    Handles multi-person detection by picking the person with the largest bounding box area.
    This version is robust against frames where boxes are detected but keypoints are not.
    
    Returns:
        np.ndarray: A NumPy array of shape (17, 3) with normalized [y, x, confidence].
                    Returns a zero array if no valid person with keypoints is detected.
    """
    # --- THIS IS THE ROBUST FIX ---
    # Check if the necessary attributes exist and contain data.
    # It's possible to detect boxes but no keypoints, or vice-versa.
    if result.keypoints is None or result.boxes is None or len(result.keypoints.data) == 0 or len(result.boxes.data) == 0:
        return np.zeros((17, 3), dtype=np.float32)

    # Ensure the number of detected boxes matches the number of detected keypoint sets.
    # This is a sanity check for potential inconsistencies in the model's output.
    if len(result.boxes.data) != len(result.keypoints.data):
        # print("Warning: Mismatch between number of boxes and keypoints. Skipping frame.")
        return np.zeros((17, 3), dtype=np.float32)

    kpts_tensor = result.keypoints.data
    boxes_tensor = result.boxes.data

    # --- Strategy: Select the person with the largest bounding box ---
    # This logic is now safe because we've confirmed boxes_tensor is not empty.
    if boxes_tensor.shape[0] > 1:
        areas = (boxes_tensor[:, 2] - boxes_tensor[:, 0]) * (boxes_tensor[:, 3] - boxes_tensor[:, 1])
        main_person_idx = areas.argmax()
    else:
        main_person_idx = 0

    # Select the keypoints of the main person
    person_kpts = kpts_tensor[main_person_idx]
    
    # Convert to numpy
    person_kpts_np = person_kpts.cpu().numpy()
    
    # Format to [y, x, confidence] and normalize
    formatted_kps = np.zeros((17, 3), dtype=np.float32)
    for i in range(17):
        x, y, conf = person_kpts_np[i]
        
        # Check for valid coordinates before normalization
        if frame_h > 0 and frame_w > 0:
            formatted_kps[i] = [y / frame_h, x / frame_w, conf]
        else:
            formatted_kps[i] = [0.0, 0.0, conf]
            
    return formatted_kps


def extract_keypoints_from_video(video_path, yolo_model):
    """
    Extracts keypoints from all frames of a single video using YOLO.

    Args:
        video_path (str): Path to the video file.
        yolo_model: The loaded YOLO model object.

    Returns:
        np.ndarray: A NumPy array of shape (num_frames, 17, 3) containing
                    normalized keypoints [y, x, confidence] for the main person in the video.
    """
    video_keypoints = []
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Warning: Could not open video {video_path}. Skipping.")
        return None
    
    num_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    frame_h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    frame_w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    
    for _ in range(num_frames):
        ret, frame = cap.read()
        if not ret:
            break
            
        # Run YOLO inference
        results = yolo_model(frame, verbose=False)
        result = results[0] # Get the first result object

        # Process keypoints for the main person
        frame_kps = get_main_person_keypoints(result, frame_h, frame_w)
        video_keypoints.append(frame_kps)
        
    cap.release()
    
    if not video_keypoints:
        return None
        
    return np.array(video_keypoints, dtype=np.float32)


def process_all_videos(dataset_dir, output_dir, yolo_model):
    """
    Scans the dataset directory, processes each video, and saves the keypoints.
    """
    # Create the base output directory
    os.makedirs(output_dir, exist_ok=True)
    
    videos_to_process = []
    # Discover all video files and their corresponding output paths
    for label_name in ["Fall", "No_Fall"]:
        video_folder = os.path.join(dataset_dir, label_name, "Raw_Video")
        output_label_folder = os.path.join(output_dir, label_name)
        os.makedirs(output_label_folder, exist_ok=True)
        
        if not os.path.exists(video_folder):
            print(f"Warning: Folder not found {video_folder}")
            continue
            
        for video_filename in os.listdir(video_folder):
            if not video_filename.lower().endswith(('.mp4', '.avi', '.mov')):
                continue
            
            video_path = os.path.join(video_folder, video_filename)
            video_basename = os.path.splitext(video_filename)[0]
            output_npy_path = os.path.join(output_label_folder, f"{video_basename}.npy")
            
            videos_to_process.append((video_path, output_npy_path))
            
    print(f"Found {len(videos_to_process)} videos to process.")
    
    # Process each video with a progress bar
    pbar = tqdm(videos_to_process, desc="Processing videos")
    for video_path, output_npy_path in pbar:
        pbar.set_description(f"Processing {os.path.basename(video_path)}")
        
        # This makes the script resumable. If a file exists, skip it.
        if os.path.exists(output_npy_path):
            continue
            
        # Extract keypoints
        all_keypoints = extract_keypoints_from_video(video_path, yolo_model)
        
        # Save the result as a .npy file
        if all_keypoints is not None:
            np.save(output_npy_path, all_keypoints)
            
    print("\n--- Pre-processing complete! ---")
    print(f"All keypoint data has been saved to: {output_dir}")

# --- 3. Main Execution ---
if __name__ == "__main__":
    yolo_model = load_yolo_model()
    
    if yolo_model:
        process_all_videos(DATASET_DIR, OUTPUT_DIR, yolo_model)

--- Loading YOLO-Pose Model: yolo11n-pose.pt ---
Model loaded successfully on device: cuda
Found 6988 videos to process.


Processing W090.mp4: 100%|██████████| 6988/6988 [2:42:30<00:00,  1.40s/it]                      


--- Pre-processing complete! ---
All keypoint data has been saved to: processed_keypoints_yolo



