### We aims at normalize the extracted keypoints from different videos. That is, in different signing videos, the singer may have different distances to the camera, different H/W rate, different height, skeleton feature, etc. We hope to normalize and eliminate all these differences.

In [45]:
import sys
import os
import time
import pickle
import cv2
import matplotlib.pyplot as plt
import mediapipe as mp
import pickle
import shutil
import multiprocessing
import logging
import numpy as np
import gc
import traceback

from collections import Counter, defaultdict
from copy import deepcopy
from tqdm import tqdm
from multiprocessing import Pool, Manager
from pathlib import Path

# Change to the model directory for DWPose
sys.path.append('./ailab_DWPose_not_git/ControlNet-v1-1-nightly/')
from annotator.dwpose import DWposeDetector_canlin_no_output_img

In [46]:
##################################
parsing_resolution = 1024 #we always use 1024 as input image resolution for parsing

mp_pose = mp.solutions.pose #pose model
mp_face_mesh = mp.solutions.face_mesh # FaceMesh model
mp_hands = mp.solutions.hands #hands model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

mp_holistic = mp.solutions.holistic  # Holistic model: used for hand distinguish/trim, etc.

In [47]:
#detect the landmarks from the image
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) #color conversion
    image.flags.writeable = False                  #Image is no longer writeable
    results = model.process(image)                 #Make prediction
    image.flags.writeable = True                   #Image is writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # color conversion
    return image, results


#formly define the landmark (keypoint) extraction function
def extract_keypoints_holistic(results):
    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return(pose, face, lh, rh)


'''
When there's an unexpected handedness value for a single detected hand, 
we assume it to be the right hand and extract its keypoints.

If two hands are detected and either one or both have unexpected handedness values, 
we extract keypoints for both hands, assigning the first detected hand as right and the 
second as left.

If the handedness value is all expected. We will append the keypoints accordingly.
'''

def extract_keypoints(pose_results, face_mesh_results, hand_results):
    # Extracting Pose Landmarks
    pose = np.array([[res.x, res.y, res.z] for res in pose_results.pose_landmarks.landmark]).flatten() if pose_results.pose_landmarks else np.zeros(33 * 3)
    
    # Extracting Face Landmarks
    face = np.array([[res.x, res.y, res.z] for res in face_mesh_results.multi_face_landmarks[0].landmark]).flatten() if face_mesh_results.multi_face_landmarks else np.zeros(478 * 3)
    
    # Initialize empty hand keypoints
    right_hand = np.zeros(21 * 3)
    left_hand = np.zeros(21 * 3)

    # Check number of hands detected
    num_hands_detected = len(hand_results.multi_hand_landmarks) if hand_results.multi_hand_landmarks else 0

    valid_handedness_values = ['Right', 'Left']

    if num_hands_detected == 1:
        # Only one hand is detected, rely on handedness
        handedness = hand_results.multi_handedness[0].classification[0].label

        # Check for valid handedness value
        if handedness not in valid_handedness_values:
            # Handle unexpected handedness value here (e.g., log a warning, skip the frame, etc.)
            print(f"Warning: Unexpected handedness value '{handedness}'")
            right_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[0].landmark]).flatten()
            return pose, face, right_hand, left_hand
        
        if handedness == 'Right':
            right_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[0].landmark]).flatten()
        else:
            left_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[0].landmark]).flatten()
    
    elif num_hands_detected == 2:
        # Two hands are detected, first check handedness
        handedness_0 = hand_results.multi_handedness[0].classification[0].label
        handedness_1 = hand_results.multi_handedness[1].classification[0].label

        # Check for valid handedness values
        if handedness_0 not in valid_handedness_values or handedness_1 not in valid_handedness_values:
            print(f"Warning: Unexpected handedness values '{handedness_0}' and '{handedness_1}'")
            right_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[0].landmark]).flatten()
            left_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[1].landmark]).flatten()
            return pose, face, right_hand, left_hand
        
        # If both hands have different handedness
        if handedness_0 != handedness_1:
            if handedness_0 == 'Right':
                right_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[0].landmark]).flatten()
                left_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[1].landmark]).flatten()
            else:
                right_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[1].landmark]).flatten()
                left_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[0].landmark]).flatten()
        
        # If both hands are detected as the same handedness
        else:
            # Ignore handedness and assign the first as right hand and the second as left hand
            right_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[0].landmark]).flatten()
            left_hand = np.array([[landmark.x, landmark.y, landmark.z] for landmark in hand_results.multi_hand_landmarks[1].landmark]).flatten()
    
    '''
    The handedness assume the image is mirrored, which is not true for Signing Savvy.
    As a result, we will just switch the left and right hand!
    '''
    temp = right_hand
    right_hand = left_hand
    left_hand = temp
    del(temp)
    
    return pose, face, right_hand, left_hand

In [48]:
def reshape_array(arr):
    if len(arr) % 3 != 0:
        raise ValueError("The length of the array must be a multiple of 3.")
    return arr.reshape(-1, 3)


def zero_rate(arr):
    """
    Compute the rate of zeros in a given 1D numpy array.

    Parameters:
    - arr (np.array): 1D numpy array

    Returns:
    - float: rate of zeros in the array
    """
    num_zeros = np.sum(arr == 0)
    rate = num_zeros / len(arr)
    return rate


#two supportive functions:
def euclidean_distance(point1, point2):
    return ((point1[0] - point2[0])**2 + (point1[1] - point2[1])**2) ** 0.5


def dimensionwise_distance(hand1, hand2):
    """Computes the dimension-wise distance between two hands, ignoring z-values."""
    distances = [abs(a - b) for i, (a, b) in enumerate(zip(hand1, hand2)) if i % 3 != 2]
    return sum(distances)


'''We apply the strict L-2 distance here'''
def dimensionwise_distance_single_max(hand1, hand2):
    if len(hand1) % 3 != 0 or len(hand2) % 3 != 0:
        raise ValueError("Hand keypoints should be a multiple of 3")
    
    # Compute the L-2 distance for each pair of (xi, yi) points, 
    # considering zero distances if either of the keypoints is zero.
    distances = [
        0 if (hand1[i] == 0 and hand1[i+1] == 0) or (hand2[i] == 0 and hand2[i+1] == 0) 
        else ((hand1[i] - hand2[i])**2 + (hand1[i+1] - hand2[i+1])**2)**0.5 
        for i in range(0, len(hand1), 3)
    ]
    return max(distances)


'''we calculate the average-distance based on L2'''
def dimensionwise_distance_single_avg(hand1, hand2):
    
    if len(hand1) % 3 != 0 or len(hand2) % 3 != 0:
        raise ValueError("Hand keypoints should be a multiple of 3")
        
    # If either hand is mostly zeros, return zero
    if zero_rate(hand1) > 0.5 or zero_rate(hand2) > 0.5:
        return 0.
        
    # Compute the L-2 distance for each pair of (xi, yi) points.
    distances = [((hand1[i] - hand2[i])**2 + (hand1[i+1] - hand2[i+1])**2)**0.5 for i in range(0, len(hand1), 3)]
    
    # Calculate the average distance if there are any distances calculated
    if distances:
        return sum(distances) / len(distances)
    else:
        return 0.  # Return 0 if no distances to avoid division by zero

In [49]:
'''
We define overlap hand like this: The left and right hand (both hands) are not zeros, and
their elementwise distence is less than the threshold. Also, we in addition require that
overlapped hands is close to one wrist, while the other wrist is missing or outside the image.

In this case, we will delete one hand (change to zero): we will assume the left/right hand
assignment is correct, which is basically proved to be true on character sign videos.
Then, we will delete the hand from the far away pose wrist.

Note that the distance_threshold here is different from that in correct_handedness_temporal.
'''

#function to detect & remove one hand if both hands are overlapping
def handle_overlap_hand_new(pose_landmark, left_hand, right_hand):
    
    #pose_landmark = data_dict['pose']
    #left_hand = data_dict['left_hand']
    #right_hand = data_dict['right_hand']
    
    #only make sense to continue if both and are not zeros
    if any(value != 0 for value in left_hand) and any(value != 0 for value in right_hand):
    
        hand_d = dimensionwise_distance(left_hand, right_hand)

        #left wrist to right hand wrist, similar naming strategy below
        l_d_r = euclidean_distance([pose_landmark[15 * 3], pose_landmark[15 * 3 + 1]], 
                                   [right_hand[0], right_hand[1]])
        l_d_l = euclidean_distance([pose_landmark[15 * 3], pose_landmark[15 * 3 + 1]], 
                                   [left_hand[0], left_hand[1]])
        r_d_r = euclidean_distance([pose_landmark[16 * 3], pose_landmark[16 * 3 + 1]], 
                                   [right_hand[0], right_hand[1]])
        r_d_l = euclidean_distance([pose_landmark[16 * 3], pose_landmark[16 * 3 + 1]], 
                                   [left_hand[0], left_hand[1]])

        #0.4 is the threshold for dimensionwise distance, we have testify this value, should be fine
        # !!!!! 0.4 is different from the SL Generation code, where we use 0.6. We are more cautious here.
        #0.08 is the lower bound wrist distance, 0.3 is the upper bound
        #this says that: if hands are close and (hands are close to one wrist, away from the other wrist, or the other wrist is outside the image)
        if (hand_d <= 0.4) and ((r_d_r <= 0.08 and (l_d_r >= 0.3 or pose_landmark[15 * 3] >= 0.93 or pose_landmark[15 * 3 + 1] >= 0.93)) or (
                                 l_d_r <= 0.08 and (r_d_r >= 0.3 or pose_landmark[16 * 3] >= 0.93 or pose_landmark[16 * 3 + 1] >= 0.93)) or (
                                 r_d_l <= 0.08 and (l_d_l >= 0.3 or pose_landmark[15 * 3] >= 0.93 or pose_landmark[15 * 3 + 1] >= 0.93)) or (
                                 l_d_l <= 0.08 and (r_d_l >= 0.3 or pose_landmark[16 * 3] >= 0.93 or pose_landmark[16 * 3 + 1] >= 0.93))):

            #right wrist close: so left wrist should be far away or outside, so we remove left hand
            if r_d_r <= 0.08 or r_d_l <= 0.08:
                left_hand = np.zeros(21 * 3)

            #left wrist close: so right wrist should be far away or outside, so we remove right hand
            if l_d_r <= 0.08 or l_d_l <= 0.08:
                right_hand = np.zeros(21 * 3)
                
    return(left_hand, right_hand)

In [50]:
def count_invalid_dimensions(array_2):
    """
    Count the number of invalid dimensions in array_2 (DWPose).
    An invalid dimension is defined as having either x or y coordinates to be -1.
    
    !!!!! In the same function previously (in extract_kp_from_Kylie_data), we define
    invalid dimension as both x and y equals -1. This is not good:
    In SL_generation code, we force x and y to be -1 if either of them is -1.
    But without this procedure, ususally we cannot assume both x and y to be -1.
    Hence, we changed to either x or y coordinates to be -1.
    That says, the kp_extract_main_0 and 1.py has defult in extract_kp_from_Kylie_data!
    But luckly we do not use DWPose at all so doesn't matter.....
    
    Finally, instead of checking whether the value exactly match -1, we check < -0.9 instead

    Parameters:
    - array_2 (np.ndarray): 2D array of keypoints [x1, y1], [x2, y2], ..., [xn, yn]

    Returns:
    - int: Number of invalid dimensions
    """
    num_invalid = 0
    for dim in array_2:
        if dim[0] < -0.9 or dim[1] < -0.9:
            num_invalid += 1
            
        #print(dim)
    return num_invalid

In [51]:
def mp_DW_hand_dist(array_1, array_2):
    """
    Calculate the average and maximum Euclidean distances between corresponding keypoints 
    in array_1 and array_2 based on x and y coordinates, excluding keypoints in array_2
    
    !!!!! Same story, we changed checking [-1, -1] to check either x or y < 0.9.
    This is different from extract_kp_from_Kylie_data.

    Parameters:
    - array_1 (np.ndarray): 1D array of keypoints including z coordinates [x1, y1, z1, ..., xn, yn, zn]
    - array_2 (np.ndarray): 2D array of keypoints [[x1, y1], [x2, y2], ..., [xn, yn]]

    Returns:
    - float: average distance between valid keypoints
    - float: maximum distance between valid keypoints
    """
    # Reshape array_1 to extract x and y coordinates, ignoring z coordinates
    xy_array_1 = array_1.reshape(-1, 3)[:, :2]
    
    # Ensure array_2 is a numpy array (in case it isn't)
    array_2 = np.array(array_2)
    
    # Create a mask manually to filter out invalid keypoints from array_2
    valid_mask = []
    for point in array_2:
        if point[0] < -0.9 or point[1] < -0.9:
            valid_mask.append(False)
        else:
            valid_mask.append(True)
    
    # Convert list to numpy array for indexing
    valid_mask = np.array(valid_mask)
    
    # Apply mask to both arrays
    valid_xy_array_1 = xy_array_1[valid_mask]
    valid_array_2 = array_2[valid_mask]
    
    # Calculate Euclidean distances between valid keypoints
    distances = np.linalg.norm(valid_xy_array_1 - valid_array_2, axis=1)
    
    # Calculate average and maximum distance
    if distances.size == 0:  # Check if there are no valid points after masking
        return float(10), float(10)  # Return NaN if no valid distances to calculate
    else:
        average_distance = np.mean(distances)
        maximum_distance = np.max(distances)
    
    return(average_distance, maximum_distance)

In [52]:
'''
Given the mediapipe hand, we want to decide whether it is more 'left' or more 'right':
    We will calculate the distance between this hand and each DWpose hand, 1/distance will be the score
    We will calculate the distance between this hand wrist and each DWpose wrist, 1/distance is the score
    We add the score to decide its left score or right score, indicating how 'left' the given hand is, how right the given hand is.
'''
# this is the function to actually choose the most appropriate hand
def left_or_right_for_given_hand(given_hand, l_h_DW, r_h_DW, l_w_DW, r_w_DW):
    
    #the score on whether the given mediapipe hand is 'how left' or 'how right'
    l_score, r_score = 0., 0.
    
    #DWPose hand vote, must be not too much [-1,-1], in which case we trust it the most
    if count_invalid_dimensions(l_h_DW) <= 3: #left 
        ave_l, max_l = mp_DW_hand_dist(given_hand, l_h_DW)
        l_score += 1/(ave_l + 0.00000001)
    
    if count_invalid_dimensions(r_h_DW) <= 3: #right
        ave_r, max_r = mp_DW_hand_dist(given_hand, r_h_DW)
        r_score += 1/(ave_r + 0.00000001)


    #left pose wrist vote, must be non-zero and not -1 (valid) pose
    if l_w_DW[0] >= -0.9 and l_w_DW[1] >= -0.9 and l_w_DW[0] != 0 and l_w_DW[1] != 0:
        x1 = l_w_DW[0]
        y1 = l_w_DW[1]
        
        x2 = given_hand[0]
        y2 = given_hand[1]
        
        distance = np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
        l_score += 0.4/(distance + 0.00000001) #0.4 is the weight we add to wrist decision
        
    #right pose wrist vote, must be non-zero (valid) pose
    if r_w_DW[0] >= -0.9 and r_w_DW[1] >= -0.9 and r_w_DW[0] != 0 and r_w_DW[1] != 0:
        x1 = r_w_DW[0]
        y1 = r_w_DW[1]
        
        x2 = given_hand[0]
        y2 = given_hand[1]
        
        distance = np.sqrt((x1 - x2)**2 + (y1 - y2)**2)
        r_score += 0.4/(distance + 0.00000001)

    return(l_score, r_score)

In [53]:
#function to finally decide left/right hand
def decide_l_r_hand(lh, rh, lh_DW, rh_DW, lw, rw):
    
    #calculate with DWPose only if the mediapipe hand is not zero
    if zero_rate(lh) < 0.2:
        #l_l is how 'left hand' the left hand looks like
        #r_l is how 'right hand' the left hand looks liks 
        l_l, r_l = left_or_right_for_given_hand(lh, lh_DW, rh_DW, lw, rw)
    else:
        l_l, r_l = 0., 0.
    
    #calculate with DWPose only if the mediapipe hand is not zero
    if zero_rate(rh) < 0.2:
        #similarly, l_r is how 'left hand' right hand looks like
        #r_r is how 'right hand' the right hand looks like
        l_r, r_r = left_or_right_for_given_hand(rh, lh_DW, rh_DW, lw, rw)
    else:
        l_r, r_r = 0., 0.
    
    List = [['l_l', l_l], ['r_l', r_l], ['l_r', l_r], ['r_r', r_r]]
        
    List = sorted(List, key=lambda x: x[1], reverse=True)
    top_sub_list = List[0] #top top_sub_list is the highest score hand
    top_str = top_sub_list[0]
    
    hand_should_be = top_str.split('_')[0]
    actual_hand = top_str.split('_')[1]
    
    #return: decided left hand, decided right hand
    #if reverted, we return 'Reverted', otherwise return 'Not_reverted'
    if hand_should_be == 'l' and actual_hand == 'l':
        return(lh, rh, 'Not_reverted')
    if hand_should_be == 'l' and actual_hand == 'r':
        return(rh, lh, 'Reverted')
    if hand_should_be == 'r' and actual_hand == 'l':
        return(rh, lh, 'Reverted')
    if hand_should_be == 'r' and actual_hand == 'r':
        return(lh, rh, 'Not_reverted')

In [54]:
def fix_scattered_keypoints(keypoints, bd_1=0.96, bd_2=0.86):
    """
    Fix scattered keypoints in a hand keypoint array.

    :param keypoints: A (21, 2) numpy array of hand keypoints.
    :return: A (21, 2) numpy array with scattered points replaced by [-1, -1].
    
    We detect scatterred points like this:
    if there exists keypoints with y >= bd_1;
    if there exists keypoints with y <= bd_2;
    but there is not keypoint with y in between;
    
    Then, we regard all the keypoints with y <= bd_2 scattered points, we change to [-1 -1]
    """
    # Set the other value to -1 if one is -1
    for i in range(len(keypoints)):
        if keypoints[i][0] == -1 or keypoints[i][1] == -1:
            keypoints[i] = np.asarray([-1, -1])

    # Check the specified conditions for scattered points
    has_low_keypoints = any(point[1] >= bd_1 for point in keypoints if point[1] != -1)
    has_high_keypoints = any(point[1] <= bd_2 for point in keypoints if point[1] != -1)
    has_no_mid_keypoints = not any(bd_2 < point[1] < bd_1 for point in keypoints if point[1] != -1)

    # If conditions are met, fix scattered points
    if has_low_keypoints and has_high_keypoints and has_no_mid_keypoints:
        for i in range(len(keypoints)):
            if keypoints[i][1] != -1 and keypoints[i][1] <= bd_2:
                keypoints[i] = np.asarray([-1, -1])

    return keypoints

In [55]:
def obtain_key_vector(file_name, model):
    
    #Dict = defaultdict(dict)
    vec_array = list()
    
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic, \
         mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose_func, \
         mp_face_mesh.FaceMesh(static_image_mode=True, max_num_faces=1, refine_landmarks=True, min_detection_confidence=0.5, min_tracking_confidence=0.5) as face_mesh_func, \
         mp_hands.Hands(min_detection_confidence=0.5, min_tracking_confidence=0.5) as hands_func:

        vidcap = cv2.VideoCapture(file_name)
        fps = vidcap.get(cv2.CAP_PROP_FPS)
        number_of_frames = int(vidcap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Get the video's original width and height
        frame_height = int(vidcap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        frame_width = int(vidcap.get(cv2.CAP_PROP_FRAME_WIDTH))

        success, image = vidcap.read()
        count = 0
        while success:

            #####Mediapipe detections############
            _, results = mediapipe_detection(image, holistic)
            _, pose_results = mediapipe_detection(image, pose_func)
            _, face_mesh_results = mediapipe_detection(image, face_mesh_func)
            _, hand_results = mediapipe_detection(image, hands_func)
            
            # Transfer keypoint results into arrays
            pose, face, rh, lh = extract_keypoints(pose_results, face_mesh_results, hand_results)
            
            #obtain the keypoint array from the holistic model
            pose_ho, face_ho, lh_ho, rh_ho = extract_keypoints_holistic(results)
            
            #deal with overlapped hands first
            lh, rh = handle_overlap_hand_new(pose, lh, rh)
            lh_ho, rh_ho = handle_overlap_hand_new(pose, lh_ho, rh_ho)
            
            kp_vectors = model(image)

            fixed_l = fix_scattered_keypoints(kp_vectors['hands'][0])
            fixed_r = fix_scattered_keypoints(kp_vectors['hands'][1])
            kp_vectors['hands'][0] = fixed_l
            kp_vectors['hands'][1] = fixed_r
            kp_vectors['bodies']['candidate'] = deepcopy(kp_vectors['bodies']['candidate'][:18, :])
            
            ####Left/Right hand correction of mediapipe#######
            lh, rh, revert = decide_l_r_hand(lh, rh, kp_vectors['hands'][0], 
                                                     kp_vectors['hands'][1], 
                                                     pose[45:48], pose[48:51])
            
            lh_ho, rh_ho, revert_ho = decide_l_r_hand(lh_ho, rh_ho, kp_vectors['hands'][0], 
                                                                    kp_vectors['hands'][1], 
                                                                    pose[45:48], pose[48:51])

            
            ####add mediapipe stuff###############
            kp_vectors['pose_mp'] = pose
            kp_vectors['face_mp'] = face
                        
            kp_vectors['pose_holistic_mp'] = pose_ho
            kp_vectors['face_holistic_mp'] = face_ho
            
            if zero_rate(lh) < 0.2:
                kp_vectors['left_hand_mp'] = lh
            else:
                kp_vectors['left_hand_mp'] = lh_ho
            
            if zero_rate(rh) < 0.2:
                kp_vectors['right_hand_mp'] = rh
            else:
                kp_vectors['right_hand_mp'] = rh_ho
            
            kp_vectors['left_hand_holistic_mp'] = lh_ho
            kp_vectors['right_hand_holistic_mp'] = rh_ho

            kp_vectors['hand_revert'] = revert
            kp_vectors['holistic_hand_revert'] = revert_ho

            vec_array.append(kp_vectors)

            success, image = vidcap.read()
            count += 1


        vidcap.release()
        
    return vec_array, fps, number_of_frames, frame_height, frame_width

In [56]:
# !!!!!!!!!!!!! not used in fact
def resize_keypoints(keypoints, resize_height_rate, resize_width_rate):
    """
    Resize keypoints by given rates, keeping the upper-left point (0,0) as invariant.
    
    Args:
        keypoints: numpy array in either:
                    - 1D format (x1,y1,z1, x2,y2,z2, ...)
                    - 2D format ((x1,y1), (x2,y2), ...)
        resize_height_rate: float, rate to resize in height (y-direction)
        resize_width_rate: float, rate to resize in width (x-direction)
    
    Returns:
        numpy array with same format as input but with resized coordinates
    """
    # Convert input to numpy array if it isn't already
    keypoints = np.array(keypoints)
    
    # Detect input format (1D or 2D)
    is_1d = len(keypoints.shape) == 1
    
    if is_1d:
        # 1D format with z-values
        n_points = len(keypoints) // 3  # Each point has (x,y,z)
        # Reshape to (n_points, 3)
        points = keypoints.reshape(n_points, 3)
        # Scale x and y, keep z unchanged
        points[:, 0] *= resize_width_rate   # x coordinates
        points[:, 1] *= resize_height_rate  # y coordinates
        # Flatten back to 1D
        return points.reshape(-1)
    else:
        # 2D format without z-values
        resized = keypoints.copy()
        resized[:, 0] *= resize_width_rate    # x coordinates
        resized[:, 1] *= resize_height_rate   # y coordinates
        return resized


# Example usage and testing
if __name__ == "__main__":
    # Test with 1D format (with z)
    kp_1d = np.array([0.2, 0.3, 0.1, 0.4, 0.5, 0.2])  # Two points: (0.2, 0.3, 0.1), (0.4, 0.5, 0.2)
    resized_1d = resize_keypoints(kp_1d, resize_height_rate=2.0, resize_width_rate=1.5)
    print("Original 1D:", kp_1d)
    print("Resized 1D:", resized_1d)
    print(type(resized_1d))
    
    # Test with 2D format
    kp_2d = np.array([[0.2, 0.3], [0.4, 0.5]])  # Two points: (0.2, 0.3), (0.4, 0.5)
    resized_2d = resize_keypoints(kp_2d, resize_height_rate=2.0, resize_width_rate=1.5)
    print("\nOriginal 2D:", kp_2d)
    print("Resized 2D:", resized_2d)
    print(type(resized_2d))

Original 1D: [0.2 0.3 0.1 0.4 0.5 0.2]
Resized 1D: [0.3 0.6 0.1 0.6 1.  0.2]
<class 'numpy.ndarray'>

Original 2D: [[0.2 0.3]
 [0.4 0.5]]
Resized 2D: [[0.3 0.6]
 [0.6 1. ]]
<class 'numpy.ndarray'>


In [57]:
# !!!!!!!!!!!!! not used in fact
def move_keypoints(keypoints, x0, y0):
    """
    Move keypoints by adding (x0, y0) to all (x, y) coordinates.
    Z coordinates (if present) remain unchanged.
    
    Args:
        keypoints: numpy array in either:
                    - 1D format (x1,y1,z1, x2,y2,z2, ...)
                    - 2D format ((x1,y1), (x2,y2), ...)
        x0: float, displacement in x direction
        y0: float, displacement in y direction
    
    Returns:
        numpy array with same format as input but with moved coordinates
    """
    # Convert input to numpy array if it isn't already
    keypoints = np.array(keypoints)
    
    # Detect input format (1D or 2D)
    is_1d = len(keypoints.shape) == 1
    
    if is_1d:
        # 1D format with z-values
        n_points = len(keypoints) // 3  # Each point has (x,y,z)
        # Reshape to (n_points, 3)
        points = keypoints.reshape(n_points, 3)
        # Move x and y, keep z unchanged
        points[:, 0] += x0   # x coordinates
        points[:, 1] += y0   # y coordinates
        # Flatten back to 1D
        return points.reshape(-1)
    else:
        # 2D format without z-values
        moved = keypoints.copy()
        moved[:, 0] += x0    # x coordinates
        moved[:, 1] += y0    # y coordinates
        return moved

# Example usage and testing
if __name__ == "__main__":
    # Test with 1D format (with z)
    kp_1d = np.array([0.2, 0.3, 0.1, 0.4, 0.5, 0.2])  # Two points: (0.2, 0.3, 0.1), (0.4, 0.5, 0.2)
    moved_1d = move_keypoints(kp_1d, x0=0.1, y0=-0.2)
    print("Original 1D:", kp_1d)
    print("Moved 1D:", moved_1d)
    
    # Test with 2D format
    kp_2d = np.array([[0.2, 0.3], [0.4, 0.5]])  # Two points: (0.2, 0.3), (0.4, 0.5)
    moved_2d = move_keypoints(kp_2d, x0=0.1, y0=-0.2)
    print("\nOriginal 2D:", kp_2d)
    print("Moved 2D:", moved_2d)

Original 1D: [0.2 0.3 0.1 0.4 0.5 0.2]
Moved 1D: [0.3 0.1 0.1 0.5 0.3 0.2]

Original 2D: [[0.2 0.3]
 [0.4 0.5]]
Moved 2D: [[0.3 0.1]
 [0.5 0.3]]


In [58]:
def compute_l2_distance(array1, array2):
    """
    Compute L2 (Euclidean) distance between two 1D arrays
    
    Parameters:
    array1, array2: arrays of same length
    
    Returns:
    float: L2 distance between the arrays
    """
    # Convert inputs to numpy arrays if they aren't already
    a1 = np.array(array1)
    a2 = np.array(array2)
    
    # Compute L2 distance
    distance = np.sqrt(np.sum((a1 - a2) ** 2))
    
    return distance

# Example usage
x1, y1 = [0.34080809, 0.56194119]
x2, y2 = [0.66353743, 0.5563285]

distance = compute_l2_distance([x1, y1], [x2, y2])
print(f"L2 distance between {[x1, y1]} and {[x2, y2]}: {distance:.3f}")

L2 distance between [0.34080809, 0.56194119] and [0.66353743, 0.5563285]: 0.323


In [59]:
def process_keypoints(keypoints_dict, resize_height_rate=1.0, resize_width_rate=1.0, move_x=0.0, move_y=0.0):
    """
    Resize and move keypoints in the dictionary while maintaining specific formats and rules.
    
    Args:
        keypoints_dict: Dictionary containing different types of keypoints
        resize_height_rate: float, rate to resize in height (y-direction)
        resize_width_rate: float, rate to resize in width (x-direction)
        move_x: float, displacement in x direction
        move_y: float, displacement in y direction
    
    Returns:
        Dictionary with processed keypoints
    """
    result_dict = {}
    
    # Helper function to process 2D array of shape (..., 2)
    def process_2d_points(points):
        # Check for invalid points (x or y < -0.9)
        invalid_mask = np.any(points < -0.9, axis=-1)
        
        # Process valid points
        processed = points.copy()
        processed[..., 0] = points[..., 0] * resize_width_rate + move_x
        processed[..., 1] = points[..., 1] * resize_height_rate + move_y
        
        # Reset invalid points to [-1, -1]
        processed[invalid_mask] = [-1, -1]
        return processed
    
    # Helper function to process 1D array with z values
    def process_1d_points_with_z(points):
        if np.all(points == 0):  # If all zeros, return as is
            return points
            
        n_points = len(points) // 3
        reshaped = points.reshape(n_points, 3)
        
        # Process x and y, keep z unchanged
        reshaped[:, 0] = reshaped[:, 0] * resize_width_rate + move_x
        reshaped[:, 1] = reshaped[:, 1] * resize_height_rate + move_y
        
        return reshaped.reshape(-1)
    
    # Process DWPose body keypoints
    if 'bodies' in keypoints_dict:
        result_dict['bodies'] = {
            'candidate': process_2d_points(keypoints_dict['bodies']['candidate']),
            'subset': keypoints_dict['bodies']['subset'].copy()  # Keep subset unchanged
        }
    
    # Process DWPose hands
    if 'hands' in keypoints_dict:
        result_dict['hands'] = process_2d_points(keypoints_dict['hands'])
    
    # Process DWPose faces (unstack first dim)
    if 'faces' in keypoints_dict:
        faces = keypoints_dict['faces']
        processed_faces = process_2d_points(faces[0])  # Process the unstacked array
        result_dict['faces'] = processed_faces[np.newaxis, ...]  # Stack back
    
    # Process MediaPipe keypoints (all ending with _mp)
    for key in keypoints_dict:
        if key.endswith('_mp'):
            if key == 'confidence_score_mp':
                result_dict[key] = keypoints_dict[key]  # Keep confidence score unchanged
            else:
                result_dict[key] = process_1d_points_with_z(keypoints_dict[key])
    
    return result_dict

# Example usage
if __name__ == "__main__":
    # Create a small example dictionary
    example_dict = {
        'bodies': {
            'candidate': np.array([[0.5, 0.3], [0.6, 0.4], [-1, -1]]),
            'subset': np.array([[0., 1., 2.]])
        },
        'hands': np.array([[[0.7, 0.8], [0.9, 1.0]], [[0.1, 0.2], [-1, -1]]]),
        'faces': np.array([[[0.4, 0.3], [0.5, 0.4]]]),
        'pose_mp': np.array([0.5, 0.3, 0.1, 0.6, 0.4, 0.2]),
        'left_hand_mp': np.zeros(63),  # Example with all zeros
        'confidence_score_mp': 3.0
    }
    
    # Process the keypoints
    processed = process_keypoints(
        example_dict,
        resize_height_rate=2.0,
        resize_width_rate=1.5,
        move_x=0.1,
        move_y=-0.2
    )
    
    # Print results
    print("Processed keypoints:")
    for key in processed:
        print(f"\n{key}:")
        print(processed[key])

Processed keypoints:

bodies:
{'candidate': array([[ 0.85,  0.4 ],
       [ 1.  ,  0.6 ],
       [-1.  , -1.  ]]), 'subset': array([[0., 1., 2.]])}

hands:
[[[ 1.15  1.4 ]
  [ 1.45  1.8 ]]

 [[ 0.25  0.2 ]
  [-1.   -1.  ]]]

faces:
[[[0.7  0.4 ]
  [0.85 0.6 ]]]

pose_mp:
[0.85 0.4  0.1  1.   0.6  0.2 ]

left_hand_mp:
[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]

confidence_score_mp:
3.0


In [60]:
# function to actually implement normalization of keypoints
'''
std_sholder is the sholder L-2 distance (consider both x and y) of the DWPose.
That is, the dist between Dict['body']['candidate'][2] and Dict['body']['candidate'][5]

std_neck is the neck-end to nose L-2 distence of the DWPose.
That is, the dist between Dict['body']['candidate'][0] and Dict['body']['candidate'][1]

std_sholder=0.323, std_neck=0.197 is obtained from
../../data/SigningSavvy_Dict/new_2_key_vectors_indpt_h_adjust_DWPose/a/1-#-the-letter-a/2.pickle
That is, the first frame of lady Brenda signing letter a in fingerspelling.

Similarly, we use this frame of lady Branda to decide the center (neck end, note 1 in DWPose body)
x=0.50217276, y=0.55913485
We accordingly decide the move direction.
'''

def do_normalization(path, path_out_, ID, model, 
                     std_sholder=0.323, std_neck=0.197, std_x=0.502, std_y=0.559):
    
    #extract keypoint
    vec_array, fps, n_frames, f_H, f_W = obtain_key_vector(path, model)
    
    #calculate average neck end to nose dist
    ave_neck = 0.
    for i in range(len(vec_array)):
        x1 = vec_array[i]['bodies']['candidate'][0][0]
        y1 = vec_array[i]['bodies']['candidate'][0][1]
        x2 = vec_array[i]['bodies']['candidate'][1][0]
        y2 = vec_array[i]['bodies']['candidate'][1][1]
        distance = compute_l2_distance([x1, y1], [x2, y2])
        ave_neck += distance
    ave_neck = ave_neck/len(vec_array)
    
    #calcuate average sholder dist
    ave_sholder = 0.
    for i in range(len(vec_array)):
        x1 = vec_array[i]['bodies']['candidate'][2][0]
        y1 = vec_array[i]['bodies']['candidate'][2][1]
        x2 = vec_array[i]['bodies']['candidate'][5][0]
        y2 = vec_array[i]['bodies']['candidate'][5][1]
        distance = compute_l2_distance([x1, y1], [x2, y2])
        ave_sholder += distance
    ave_sholder = ave_sholder/len(vec_array)
    
    #obtain the resize rate
    r_h = std_neck/ave_neck
    r_w = std_sholder/ave_sholder
    
    #unlike resize rate, we only use the first frame to decide movement direction
    mv_x = std_x - vec_array[0]['bodies']['candidate'][1][0]
    mv_y = std_y - vec_array[0]['bodies']['candidate'][1][1]
    
    #resize mp and DW
    vec_array_new = list()
    
    for i in range(len(vec_array)):
        processed = process_keypoints(vec_array[i], 
                                      resize_height_rate=r_h, resize_width_rate=r_w,
                                      move_x=mv_x, move_y=mv_y)
        vec_array_new.append(processed)
        

    #save to file
    Dict_ = defaultdict()
    
    Dict_['keypoint'] = vec_array_new

    Dict_['info'] = {'video_file_location': path,
                     'fps': fps, 'number_of_frames': n_frames, 'H': f_H, 'W': f_W}
    
    with open(path_out_ + str(ID) + '.pickle', 'wb') as handle:
        pickle.dump(Dict_, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [61]:
def clean_and_create_folder(folder_path):
    if os.path.exists(folder_path):
        # Remove all contents inside the folder
        for filename in os.listdir(folder_path):
            file_path = os.path.join(folder_path, filename)
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
            elif os.path.isdir(file_path):
                shutil.rmtree(file_path)
    else:
        # Create the folder if it doesn't exist
        os.makedirs(folder_path)

In [62]:
def load_and_sort_videos(path_in):
    # Convert to Path object
    input_path = Path(path_in)
    
    # Get all files with .mp4 or .mov extension
    video_files = list(input_path.glob('*.[mM][pP]4')) + list(input_path.glob('*.[mM][oO][vV]'))
    
    # Dictionary to check for duplicates
    id_check = {}
    
    # List to store (id, path) pairs
    valid_videos = []
    
    for video_path in video_files:
        # Get filename without extension
        name = video_path.stem
        ext = video_path.suffix.lower()
        
        # Check file extension
        if ext not in ['.mp4', '.mov']:
            raise ValueError(f"Invalid file format found: {video_path}")
        
        # Check if filename is integer
        try:
            video_id = int(name)
        except ValueError:
            raise ValueError(f"Non-integer filename found: {name}")
        
        # Check for duplicates
        if video_id in id_check:
            raise ValueError(f"Duplicate video ID found: {video_id}")
        
        id_check[video_id] = True
        valid_videos.append((video_id, video_path))
    
    # Sort by ID
    valid_videos.sort(key=lambda x: x[0])
    
    # Return only the paths in sorted order
    return [str(path) for _, path in valid_videos]


######################
path_in = "./input_videos/"
sorted_videos = load_and_sort_videos(path_in)
sorted_videos

['input_videos/8.mp4',
 'input_videos/36.mp4',
 'input_videos/468.mp4',
 'input_videos/487.mp4']

In [63]:
##############################
#########the class############
class KeyPointNormalization:
    
    def __init__(self, model):
        
        self.path_in = "./input_videos/"
        self.path_out = "./results/"
        self.model = model
        
        self.sorted_videos = load_and_sort_videos(self.path_in)
        clean_and_create_folder(self.path_out)

    def run(self):
        for i in range(len(self.sorted_videos)):
            do_normalization(self.sorted_videos[i], self.path_out, i, model)

In [64]:
# Initialize the model and other classes
DW_model = DWposeDetector_canlin_no_output_img()
kp_normalizer = KeyPointNormalization(DW_model)

# Run key point extraction
kp_normalizer.run()



NoSuchFile: [ONNXRuntimeError] : 3 : NO_SUCHFILE : Load model from /Users/czhang/_Sorenson/_Projects/Sign_Language_Generation/code/kp_normalization_multi_video/ailab_DWPose/ControlNet-v1-1-nightly/annotator/dwpose/../ckpts/yolox_l.onnx failed:Load model /Users/czhang/_Sorenson/_Projects/Sign_Language_Generation/code/kp_normalization_multi_video/ailab_DWPose/ControlNet-v1-1-nightly/annotator/dwpose/../ckpts/yolox_l.onnx failed. File doesn't exist

In [1]:
import pickle
from pathlib import Path

def load_even_pickles(folder_path):
  # Convert to Path object
  folder = Path(folder_path)
  
  # Get all pickle files
  pickle_files = list(folder.glob('*.pickle'))
  
  # List to store (id, path) pairs
  valid_pickles = []
  id_check = {}
  
  for pickle_path in pickle_files:
      # Get filename without extension
      name = pickle_path.stem
      
      # Check if filename is integer
      try:
          file_id = int(name)
      except ValueError:
          raise ValueError(f"Non-integer filename found: {name}")
          
      # Check if it's even
      if file_id % 2 != 0:
          raise ValueError(f"Non-even integer filename found: {file_id}")
          
      # Check for duplicates
      if file_id in id_check:
          raise ValueError(f"Duplicate ID found: {file_id}")
          
      id_check[file_id] = True
      valid_pickles.append((file_id, pickle_path))
  
  # Sort by ID
  valid_pickles.sort(key=lambda x: x[0])
  
  # Return just the filenames (ID.pickle)
  return [f"{id}.pickle" for id, _ in valid_pickles]

# Example usage:
folder_path = "./results/"
sorted_filenames = load_even_pickles(folder_path)
print(sorted_filenames)

ValueError: Non-even integer filename found: 9

In [5]:
import pickle
from pathlib import Path

def load_pickles(folder_path):
  # Convert to Path object
  folder = Path(folder_path)
  
  # Get all pickle files
  pickle_files = list(folder.glob('*.pickle'))
  
  # List to store (id, path) pairs
  valid_pickles = []
  id_check = {}
  
  for pickle_path in pickle_files:
      # Get filename without extension
      name = pickle_path.stem
      
      # Check if filename is integer
      try:
          file_id = int(name)
      except ValueError:
          raise ValueError(f"Non-integer filename found: {name}")
          
      # Check for duplicates
      if file_id in id_check:
          raise ValueError(f"Duplicate ID found: {file_id}")
          
      id_check[file_id] = True
      valid_pickles.append((file_id, pickle_path))
  
  # Sort by ID
  valid_pickles.sort(key=lambda x: x[0])
  
  # Return just the filenames (ID.pickle)
  return [f"{id}.pickle" for id, _ in valid_pickles]

# Example usage:
folder_path = "./results/"
sorted_filenames = load_pickles(folder_path)
print(sorted_filenames)

ValueError: Non-integer filename found: normalized_keypoints

In [None]:
    #instead, we only use the first frame of shoulder, which is likely the most steady front face location
    x1 = vec_array[0]['bodies']['candidate'][2][0]
    y1 = vec_array[0]['bodies']['candidate'][2][1]
    x2 = vec_array[0]['bodies']['candidate'][5][0]
    y2 = vec_array[0]['bodies']['candidate'][5][1]
    fst_sholder = compute_l2_distance([x1, y1], [x2, y2])

In [9]:
import numpy as np

def calculate_3d_distance(point1, point2):
  # Convert inputs to numpy arrays for easier calculation
  p1 = np.array(point1)
  p2 = np.array(point2)
  
  # Calculate distance using numpy's built-in norm function
  distance = np.linalg.norm(p2 - p1)
  
  return distance

# Example usage
point1 = [1, 0, 0]
point2 = [2, 0, 0]

distance = calculate_3d_distance(point1, point2)
print(f"The distance between {point1} and {point2} is: {distance:.2f}")

The distance between [1, 0, 0] and [2, 0, 0] is: 1.00


In [13]:
def process_keypoints(keypoints_dict, resize_height_rate=1.0, resize_width_rate=1.0, move_x=0.0, move_y=0.0,
                   pose_mp_mask=None, dwpose_body_mask=None):
  """
  Resize and move keypoints in the dictionary while maintaining specific formats and rules.
  
  Args:
      keypoints_dict: Dictionary containing different types of keypoints
      resize_height_rate: float, rate to resize in height (y-direction)
      resize_width_rate: float, rate to resize in width (x-direction)
      move_x: float, displacement in x direction
      move_y: float, displacement in y direction
      pose_mp_mask: list or None, keypoint indices to keep unchanged in pose_mp 
                   (e.g., [11,12] means keeping keypoints 11,12 unchanged, 
                    which corresponds to dimensions [33:39] in the array)
      dwpose_body_mask: list or None, keypoint indices to keep unchanged in DWpose body
                       (e.g., [1,2,3] means keeping keypoints 1,2,3 unchanged)
  
  Returns:
      Dictionary with processed keypoints
  """
  result_dict = {}
  
  # Helper function to process 2D array of shape (..., 2)
  def process_2d_points(points, mask_indices=None):
      # Check for invalid points (x or y < -0.9)
      invalid_mask = np.any(points < -0.9, axis=-1)
      
      # Process valid points
      processed = points.copy()
      
      if mask_indices is not None:
          # Create a boolean mask for points to change
          change_mask = np.ones(points.shape[0], dtype=bool)
          change_mask[mask_indices] = False
          
          # Only process non-masked points
          processed[change_mask, 0] = points[change_mask, 0] * resize_width_rate + move_x
          processed[change_mask, 1] = points[change_mask, 1] * resize_height_rate + move_y
      else:
          # Process all points
          processed[..., 0] = points[..., 0] * resize_width_rate + move_x
          processed[..., 1] = points[..., 1] * resize_height_rate + move_y
      
      # Reset invalid points to [-1, -1]
      processed[invalid_mask] = [-1, -1]
      return processed
  
  # Helper function to process 1D array with z values
  def process_1d_points_with_z(points, mask_indices=None):
      if np.all(points == 0):  # If all zeros, return as is
          return points
          
      n_points = len(points) // 3
      reshaped = points.reshape(n_points, 3)
      processed = reshaped.copy()
      
      if mask_indices is not None:
          # Create a boolean mask for points to change
          change_mask = np.ones(n_points, dtype=bool)
          change_mask[mask_indices] = False
          
          # Only process non-masked points
          processed[change_mask, 0] = reshaped[change_mask, 0] * resize_width_rate + move_x
          processed[change_mask, 1] = reshaped[change_mask, 1] * resize_height_rate + move_y
      else:
          # Process all points
          processed[:, 0] = reshaped[:, 0] * resize_width_rate + move_x
          processed[:, 1] = reshaped[:, 1] * resize_height_rate + move_y
      
      return processed.reshape(-1)
  
  # Process DWPose body keypoints
  if 'bodies' in keypoints_dict:
      result_dict['bodies'] = {
          'candidate': process_2d_points(keypoints_dict['bodies']['candidate'], 
                                      mask_indices=dwpose_body_mask),
          'subset': keypoints_dict['bodies']['subset'].copy()  # Keep subset unchanged
      }
  
  # Process DWPose hands (process all points)
  if 'hands' in keypoints_dict:
      result_dict['hands'] = process_2d_points(keypoints_dict['hands'])
  
  # Process DWPose faces (unstack first dim)
  if 'faces' in keypoints_dict:
      faces = keypoints_dict['faces']
      processed_faces = process_2d_points(faces[0])  # Process the unstacked array
      result_dict['faces'] = processed_faces[np.newaxis, ...]  # Stack back
  
  # Process MediaPipe keypoints (all ending with _mp)
  for key in keypoints_dict:
      if key.endswith('_mp'):
          if key == 'confidence_score_mp':
              result_dict[key] = keypoints_dict[key]  # Keep confidence score unchanged
          elif key == 'pose_mp':
              # Convert pose_mp_mask to array indices if provided
              array_mask = None
              if pose_mp_mask is not None:
                  array_mask = [i for point in pose_mp_mask for i in range(point, point+1)]
              result_dict[key] = process_1d_points_with_z(keypoints_dict[key], 
                                                        mask_indices=array_mask)
          else:
              # Process all points for other MediaPipe keypoints
              result_dict[key] = process_1d_points_with_z(keypoints_dict[key])
  
  return result_dict

# Example usage
if __name__ == "__main__":
  # Create a small example dictionary
  example_dict = {
      'bodies': {
          'candidate': np.array([[0.5, 0.3], [0.6, 0.4], [0.7, 0.5], 
                               [0.8, 0.6], [0.9, 0.7]]),  # 5 points for demonstration
          'subset': np.array([[0., 1., 2.]])
      },
      'hands': np.array([[[0.7, 0.8], [0.9, 1.0]], [[0.1, 0.2], [-1, -1]]]),
      'faces': np.array([[[0.4, 0.3], [0.5, 0.4]]]),
      'pose_mp': np.array([0.5, 0.3, 0.1, 0.6, 0.4, 0.2] * 20),  # Example array
      'left_hand_mp': np.zeros(63),
      'confidence_score_mp': 3.0
  }
  
  # Process the keypoints with masking
  processed = process_keypoints(
      example_dict,
      resize_height_rate=2.0,
      resize_width_rate=1.5,
      move_x=0.1,
      move_y=-0.2,
      pose_mp_mask=[7, 8, 10, 11, 14],  # Keep keypoints 11,12 unchanged
      dwpose_body_mask=[1, 3]  # Keep keypoints 1,2,3 unchanged
  )

  print(processed)

{'bodies': {'candidate': array([[0.85, 0.4 ],
       [0.6 , 0.4 ],
       [1.15, 0.8 ],
       [0.8 , 0.6 ],
       [1.45, 1.2 ]]), 'subset': array([[0., 1., 2.]])}, 'hands': array([[[ 1.15,  1.4 ],
        [ 1.45,  1.8 ]],

       [[ 0.25,  0.2 ],
        [-1.  , -1.  ]]]), 'faces': array([[[0.7 , 0.4 ],
        [0.85, 0.6 ]]]), 'pose_mp': array([0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 ,
       0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.85, 0.4 , 0.1 , 0.6 ,
       0.4 , 0.2 , 0.5 , 0.3 , 0.1 , 1.  , 0.6 , 0.2 , 0.5 , 0.3 , 0.1 ,
       0.6 , 0.4 , 0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.5 , 0.3 ,
       0.1 , 1.  , 0.6 , 0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.85,
       0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 ,
       0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 ,
       0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.85, 0.4 , 0.1 , 1.  ,
       0.6 , 0.2 , 0.85, 0.4 , 0.1 , 1.  , 0.6 , 0.2 , 0.85, 0.4 , 0.1 ,


In [22]:
processed['pose_mp'][3*14:3*14+3]

array([0.5, 0.3, 0.1])

In [25]:
print([i for i in range(1, 14)])

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]


In [1]:
print([i for i in range(21)])

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
