In [None]:
import numpy as np
import matplotlib.pyplot as plt
from collections import Counter
from tqdm import tqdm
from collections import defaultdict
from math import log2
from random import shuffle
import copy

In [None]:
# Load both the 3D and 2D datasets
# Here take the GRP of H36M dataset as an example
keypoints_3d_both = np.load("./h36m_keypoints_with_eplhl_classed_action/h36m_keypoints_with_eplhl_classed_action_18299/data_3d_h36m.npz", allow_pickle=True)['positions_3d'].item()
keypoints_2d_both = np.load("./h36m_keypoints_with_eplhl_classed_action/h36m_keypoints_with_eplhl_classed_action_18299/data_2d_h36m_gt.npz", allow_pickle=True)['positions_2d'].item()
meta = np.load("./h36m_keypoints_with_eplhl_classed_action/h36m_keypoints_with_eplhl_classed_action_18299/data_2d_h36m_gt.npz", allow_pickle=True)['metadata'].item()

In [None]:
meta

In [None]:
# Helper functions
def to_skeletal(pose):
    """Convert a pose to its skeletal representation."""
    # Use the right hip as the reference joint
    reference_joint = pose[0]
    return pose - reference_joint    
    
# Functions for angle calculation and discretization
def calculate_angle(v1, v2):
    """Calculate the angle between two vectors."""
    # Check for zero magnitude vectors to prevent division by zero
    if np.linalg.norm(v1) == 0 or np.linalg.norm(v2) == 0:
        return 0.0

    cos_theta = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2))
    # Ensure the value is within the valid range for arccos
    cos_theta = np.clip(cos_theta, -1.0, 1.0)
    
    return np.degrees(np.arccos(cos_theta))


def discretize_angle(angle, lower_limit, upper_limit, step_size):
    """Discretize the given angle based on the range and step size."""
    if angle < lower_limit:
        return 0
    elif angle > upper_limit:
        return (upper_limit - lower_limit) // step_size
    else:
        return int((angle - lower_limit) // step_size)

# Main functions for pose discretization
def get_angles_from_pose(pose):
    """Extract the angles for each DoF from the given pose."""
    def calculate_angle(v1, v2):
        """Calculate the angle between two vectors."""
        norm_product = np.linalg.norm(v1) * np.linalg.norm(v2)
        if norm_product == 0:  # This checks if one of the vectors is zero
            print("Warning: Encountered zero vector!")
            print("v1:", v1)
            print("v2:", v2)
            return 0
        cos_theta = np.dot(v1, v2) / norm_product
        return np.degrees(np.arccos(np.clip(cos_theta, -1.0, 1.0)))

    # Compute vectors between joints
    vectors = {
        "RightHip": pose[0] - pose[1],
        "RightKnee": pose[1] - pose[2],
        "LeftHip": pose[0] - pose[4],
        "LeftKnee": pose[4] - pose[5],
        "Spine": pose[7] - pose[8],
        "Neck": pose[8] - pose[9],
        "RightShoulder": pose[8] - pose[14],
        "RightElbow": pose[14] - pose[15],
        "LeftShoulder": pose[8] - pose[11],
        "LeftElbow": pose[11] - pose[12]
    }

    # Placeholder for calculated angles
    angles = {}

    # Calculate angles for hip joints
    angles["RHip_Flexion_Extension"] = calculate_angle(vectors["RightHip"], np.array([0, 0, 1]))
    angles["RHip_Abduction_Adduction"] = calculate_angle(vectors["RightHip"], np.array([0, 1, 0]))
    angles["RHip_Internal_External_Rotation"] = calculate_angle(vectors["RightHip"], np.array([1, 0, 0]))

    angles["LHip_Flexion_Extension"] = calculate_angle(vectors["LeftHip"], np.array([0, 0, 1]))
    angles["LHip_Abduction_Adduction"] = calculate_angle(vectors["LeftHip"], np.array([0, 1, 0]))
    angles["LHip_Internal_External_Rotation"] = calculate_angle(vectors["LeftHip"], np.array([1, 0, 0]))

    # Calculate angles for knee joints
    angles["RKnee_Flexion_Extension"] = calculate_angle(vectors["RightKnee"], np.array([0, 0, 1]))
    angles["LKnee_Flexion_Extension"] = calculate_angle(vectors["LeftKnee"], np.array([0, 0, 1]))

    # Calculate angles for spine and neck
    angles["Spine_Flexion_Extension"] = calculate_angle(vectors["Spine"], np.array([0, 0, 1]))
    angles["Spine_Lateral_Flexion"] = calculate_angle(vectors["Spine"], np.array([0, 1, 0]))
    angles["Spine_Rotation"] = calculate_angle(vectors["Spine"], np.array([1, 0, 0]))

    angles["Neck_Flexion_Extension"] = calculate_angle(vectors["Neck"], np.array([0, 0, 1]))
    angles["Neck_Lateral_Flexion"] = calculate_angle(vectors["Neck"], np.array([0, 1, 0]))

    # Calculate angles for shoulders and elbows
    angles["RShoulder_Flexion_Extension"] = calculate_angle(vectors["RightShoulder"], np.array([0, 0, 1]))
    angles["RShoulder_Abduction_Adduction"] = calculate_angle(vectors["RightShoulder"], np.array([0, 1, 0]))
    angles["RShoulder_Internal_External_Rotation"] = calculate_angle(vectors["RightShoulder"], np.array([1, 0, 0]))

    angles["LShoulder_Flexion_Extension"] = calculate_angle(vectors["LeftShoulder"], np.array([0, 0, 1]))
    angles["LShoulder_Abduction_Adduction"] = calculate_angle(vectors["LeftShoulder"], np.array([0, 1, 0]))
    angles["LShoulder_Internal_External_Rotation"] = calculate_angle(vectors["LeftShoulder"], np.array([1, 0, 0]))

    angles["RElbow_Flexion_Extension"] = calculate_angle(vectors["RightElbow"], np.array([0, 0, 1]))
    angles["LElbow_Flexion_Extension"] = calculate_angle(vectors["LeftElbow"], np.array([0, 0, 1]))

    return angles


def discretize_pose(angles):
    """Discretize the pose based on the angles and defined angular limits."""
    angular_limits = {
        "RHip_Flexion_Extension": (-30, 120),
        "RHip_Abduction_Adduction": (-40, 40),
        "RHip_Internal_External_Rotation": (-45, 45),
        "LHip_Flexion_Extension": (-30, 120),
        "LHip_Abduction_Adduction": (-40, 40),
        "LHip_Internal_External_Rotation": (-45, 45),
        "RKnee_Flexion_Extension": (0, 150),
        "LKnee_Flexion_Extension": (0, 150),
        "Spine_Flexion_Extension": (-60, 60),
        "Spine_Lateral_Flexion": (-30, 30),
        "Spine_Rotation": (-60, 60),
        "Neck_Flexion_Extension": (-45, 45),
        "Neck_Lateral_Flexion": (-20, 20),
        "RShoulder_Flexion_Extension": (-90, 180),
        "RShoulder_Abduction_Adduction": (0, 180),
        "RShoulder_Internal_External_Rotation": (-90, 90),
        "LShoulder_Flexion_Extension": (-90, 180),
        "LShoulder_Abduction_Adduction": (0, 180),
        "LShoulder_Internal_External_Rotation": (-90, 90),
        "RElbow_Flexion_Extension": (0, 150),
        "LElbow_Flexion_Extension": (0, 150)
    }

    discretized = {}
    for dof, angle in angles.items():
        lower, upper = angular_limits[dof]
        discretized[dof] = discretize_angle(angle, lower, upper, angular_step_size)
    return discretized

joint_names = [
    "Pelvis", 
    "LeftHip", "LeftKnee", "LeftAnkle", 
    "RightHip", "RightKnee", "RightAnkle",
    "Spine",
    "Thorax", "Neck", "Head", 
    "RightShoulder", "RightElbow", "RightWrist"
    "LeftShoulder", "LeftElbow", "LeftWrist",
]

In [None]:
def compute_total_categories(step_size):
    angular_limits = {
        "RHip_Flexion_Extension": (-30, 120),
        "RHip_Abduction_Adduction": (-40, 40),
        "RHip_Internal_External_Rotation": (-45, 45),
        "LHip_Flexion_Extension": (-30, 120),
        "LHip_Abduction_Adduction": (-40, 40),
        "LHip_Internal_External_Rotation": (-45, 45),
        "RKnee_Flexion_Extension": (0, 150),
        "LKnee_Flexion_Extension": (0, 150),
        "Spine_Flexion_Extension": (-60, 60),
        "Spine_Lateral_Flexion": (-30, 30),
        "Spine_Rotation": (-60, 60),
        "Neck_Flexion_Extension": (-45, 45),
        "Neck_Lateral_Flexion": (-20, 20),
        "RShoulder_Flexion_Extension": (-90, 180),
        "RShoulder_Abduction_Adduction": (0, 180),
        "RShoulder_Internal_External_Rotation": (-90, 90),
        "LShoulder_Flexion_Extension": (-90, 180),
        "LShoulder_Abduction_Adduction": (0, 180),
        "LShoulder_Internal_External_Rotation": (-90, 90),
        "RElbow_Flexion_Extension": (0, 150),
        "LElbow_Flexion_Extension": (0, 150)
    }

    total_categories = 1
    for dof, (lower, upper) in angular_limits.items():
        categories_for_dof = ((upper - lower) // step_size) + 1  # +1 to account for both inclusive boundaries
        total_categories *= categories_for_dof

    return total_categories

In [None]:
def discretize_dataset_with_mapping(data, max_per_category=150):
    discretized_poses = []
    mapping = defaultdict(list)  # This will store the mapping
    pruning_list = []  # This will store the identifiers of poses to be pruned
    
    for subject, actions in data.items():
        if subject == 'S9' or subject == 'S11':
            continue
        for action, poses in tqdm(actions.items()):
            for idx, pose in enumerate(poses):
                skeletal_pose = to_skeletal(pose)  # Replace with your own function
                angles = get_angles_from_pose(skeletal_pose)  # Replace with your own function
                discretized = discretize_pose(angles)  # Replace with your own function
                
                # Convert the discretized pose to a tuple so it can be used as a dictionary key
                key = tuple(discretized.values())
                discretized_poses.append(key)
                
                # Record the original pose's index or identifier
                identifier = (subject, action, idx)
                mapping[key].append(identifier)
                
    # Randomly prune poses from categories with more than max_per_category poses
    for key, identifiers in mapping.items():
        if len(identifiers) > max_per_category:
            shuffle(identifiers)
            pruning_list.extend(identifiers[max_per_category:])
            
    return discretized_poses, mapping, pruning_list

In [None]:
def count_poses_in_dataset_3d(dataset):
    """
    Count the total number of poses in a given dataset.
    
    Parameters:
        dataset (dict): The dataset where keys are subject names and values are another dictionary.
                        In this inner dictionary, the keys are action names and the values are lists of poses.

    Returns:
        int: The total number of poses in the dataset.
    """
    total_count = 0
    for subject, actions in dataset.items():
        for action, poses in actions.items():
            total_count += len(poses)
    return total_count

In [None]:
def count_poses_in_dataset_2d(dataset):
    """
    Count the total number of poses in a given dataset.
    
    Parameters:
        dataset (dict): The dataset where keys are subject names and values are another dictionary.
                        In this inner dictionary, the keys are action names and the values are lists of poses.

    Returns:
        int: The total number of poses in the dataset.
    """
    total_count = 0
    for subject, actions in dataset.items():
        for action, poses in actions.items():
            total_count += len(poses[0])
    return total_count

In [None]:
# step size = 10 degrees
angular_step_size = 10

In [None]:
# After running the function, you can use `pruning_list` to remove extra poses from your original dataset.
discretized_data, mapping, pruning_list = discretize_dataset_with_mapping(keypoints_3d_both, max_per_category=150)

In [None]:
# Create deep copies
pruned_3d_dataset = copy.deepcopy(keypoints_3d_both)
pruned_2d_dataset = copy.deepcopy(keypoints_2d_both)

# Sort and deduplicate pruning list
pruning_list = sorted(set(pruning_list), key=lambda x: (x[0], x[1], x[2]), reverse=True)

# Group by subject and action
grouped_pruning_list = defaultdict(list)
for subject, action, idx in pruning_list:
    grouped_pruning_list[(subject, action)].append(idx)

# Perform the pruning
for (subject, action), indices in grouped_pruning_list.items():
    indices = sorted(indices, reverse=True)  # Sort in descending order
    for idx in indices:
        try:
            # Remove from 3D dataset
            pruned_3d_dataset[subject][action] = np.delete(pruned_3d_dataset[subject][action], idx, axis=0)
            # Remove from 2D dataset
            pruned_2d_dataset[subject][action] = [np.delete(arr, idx, axis=0) for arr in pruned_2d_dataset[subject][action]]
        except IndexError:
            print(f"Index {idx} is out of bounds for {subject}, {action}. Skipping.")


In [None]:
total_poses = count_poses_in_dataset_3d(pruned_3d_dataset)
print(f"Total number of poses in the dataset: {total_poses}")

In [None]:
total_poses = count_poses_in_dataset_2d(pruned_2d_dataset)
print(f"Total number of poses in the dataset: {total_poses}")

In [None]:
np.savez("./h36m_keypoints_with_eplhl_classed_action/h36m_95_with_eplhl_classed_action_18299/data_3d_h36m.npz", positions_3d=pruned_3d_dataset)
np.savez("./h36m_keypoints_with_eplhl_classed_action/h36m_95_with_eplhl_classed_action_18299/data_2d_h36m_gt.npz", positions_2d=pruned_2d_dataset, metadata=meta)