In [6]:
import os
import numpy as np
import cv2
import mediapipe as mp


# Determine which arm to use based on the z-coordinates of the shoulders.
def determine_arm(frame, pose_model):
    """
    Determines which arm to use for analysis (left or right) by comparing
    the z-coordinates of the left and right shoulders.

    Args:
        frame (numpy.ndarray): A single video frame in BGR format.
        pose_model: An instance of the Mediapipe Pose model.

    Returns:
        bool: True if the right arm should be used, False for the left.
    """
    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose_model.process(image_rgb)
    use_right_arm = True  # Default to right

    if results.pose_landmarks:
        landmarks = results.pose_landmarks.landmark
        if landmarks[mp.solutions.pose.PoseLandmark.LEFT_SHOULDER].z < \
           landmarks[mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER].z:
            use_right_arm = False

    return use_right_arm
  
  
  
def extract_raw_joint_data(video_path, output_path, use_normalised=False):
    """
    Extracts raw joint coordinate data (shoulder, elbow, wrist) from a video
    and saves it to an .npz file.

    Args:
        video_path (str): Path to the input video.
        output_path (str): Path to save the .npz file.
        use_normalised (bool): Whether to normalise coordinates relative to the shoulder position.

    Returns:
        None: Saves the raw joint data to an .npz file.
    """
    # Initialise Mediapipe Pose model
    mp_pose = mp.solutions.pose
    pose_model = mp_pose.Pose()

    # Open the video file
    cap = cv2.VideoCapture(video_path)
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    # Read the first frame to determine which arm to track
    ret, first_frame = cap.read()
    if not ret:
        cap.release()
        pose_model.close()
        raise ValueError(f"Unable to read video frame from {video_path}")

    use_right_arm = determine_arm(first_frame, pose_model)
    shoulder_landmark = (mp_pose.PoseLandmark.RIGHT_SHOULDER if use_right_arm
                         else mp_pose.PoseLandmark.LEFT_SHOULDER)
    elbow_landmark = (mp_pose.PoseLandmark.RIGHT_ELBOW if use_right_arm
                      else mp_pose.PoseLandmark.LEFT_ELBOW)
    wrist_landmark = (mp_pose.PoseLandmark.RIGHT_WRIST if use_right_arm
                      else mp_pose.PoseLandmark.LEFT_WRIST)

    # Reset video to the beginning
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

    # Prepare to store raw data
    timestamps = []
    raw_data = []  # Each entry will be [shoulder_x, shoulder_y, elbow_x, elbow_y, wrist_x, wrist_y]

    frame_idx = 0
    print(f"Processing video: {video_path}")

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        frame_idx += 1
        timestamp = frame_idx / fps
        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose_model.process(image_rgb)

        if results.pose_landmarks:
            landmarks = results.pose_landmarks.landmark

            # Extract raw coordinates
            shoulder = np.array([
                landmarks[shoulder_landmark.value].x * frame_width,
                landmarks[shoulder_landmark.value].y * frame_height
            ])
            elbow = np.array([
                landmarks[elbow_landmark.value].x * frame_width,
                landmarks[elbow_landmark.value].y * frame_height
            ])
            wrist = np.array([
                landmarks[wrist_landmark.value].x * frame_width,
                landmarks[wrist_landmark.value].y * frame_height
            ])

            if use_normalised:
                # Normalise coordinates relative to the shoulder position
                elbow -= shoulder
                wrist -= shoulder
                shoulder = np.array([0, 0])  # Shoulder becomes the origin

            # Append data
            raw_data.append(np.concatenate([shoulder, elbow, wrist]))
            timestamps.append(timestamp)

    cap.release()
    pose_model.close()

    # Convert to numpy arrays
    raw_data = np.array(raw_data)
    timestamps = np.array(timestamps)

    # Save to .npz file
    np.savez(output_path, timestamps=timestamps, raw_data=raw_data)
    print(f"✅ Saved raw joint data to {output_path}")


def process_video_directory(data_dir, output_dir, use_normalised=False):
    """
    Processes all videos in a directory, extracts raw joint data, and saves
    it to .npz files.

    Args:
        data_dir (str): Path to the directory containing video files.
        output_dir (str): Path to the directory to save .npz files.
        use_normalised (bool): Whether to normalise coordinates relative to the shoulder position.

    Returns:
        None
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    video_files = [f for f in os.listdir(data_dir) if f.endswith('.mp4') or f.endswith('.mov')]

    for video_file in video_files:
        video_path = os.path.join(data_dir, video_file)
        output_path = os.path.join(output_dir, f"{os.path.splitext(video_file)[0]}.npz")
        
        # Skip processing if the NPZ file already exists
        if os.path.exists(output_path):
            print(f"⏭️ Skipping {video_file} - NPZ file already exists")
            continue

        try:
            extract_raw_joint_data(video_path, output_path, use_normalised=use_normalised)
        except Exception as e:
            print(f"❌ Error processing {video_file}: {e}")

In [7]:
data_dir = "./data"  # Replace with the path to your video directory
output_dir = "./raw_joint_data_npz"  # Replace with the path to save .npz files

# Process all videos and save raw joint data
process_video_directory(data_dir, output_dir, use_normalised=False)

⏭️ Skipping IMG_5751_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5758_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5748_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5783_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5767_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5777_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5802_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5749_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5759_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5776_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5766_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5801_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5764_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5774_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5799_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5789_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5753_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5788_cut.mov - NPZ file already exists
⏭️ Skippin

I0000 00:00:1742233866.561750 3092792 gl_context.cc:369] GL version: 2.1 (2.1 Metal - 89.3), renderer: Apple M1 Pro
W0000 00:00:1742233866.655986 3126509 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1742233866.670594 3126509 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.


✅ Saved raw joint data to ./raw_joint_data_npz/IMG_5780_cut_FIXED.npz
⏭️ Skipping IMG_5796_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5772b_cut.mov - NPZ file already exists
⏭️ Skipping IMG_5754_cut.mov - NPZ file already exists
