In [1]:
import numpy as np
import pickle
import os
import io
import torch
from joints import MAIN_JOINTS
from typing import List

In [2]:
class CPU_Unpickler(pickle.Unpickler):
  def find_class(self, module, name): 
    if module == 'torch.storage' and name == '_load_from_bytes':
      return lambda b: torch.load(io.BytesIO(b), map_location='cpu')
    else:
      return super().find_class(module, name )

def read_pickle(path):
    with open(path, 'rb') as f:
      return CPU_Unpickler(f).load()

In [3]:
data = read_pickle("action_dataset_joints_leg_sampled_5.pkl")


print(data["train"][0][0].shape)

(5, 84)


  return super().find_class(module, name )


In [4]:
def reshape_joints(input_array: np.array) -> np.array:
    return np.reshape(-1, 28, 3)


In [5]:
joint_names = [
    'pelvis', 'left_hip', 'right_hip',
    'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist',
    'head', 'jaw', 'nose', 'right_eye', 'left_eye', 'right_ear', 'left_ear',
    'left_shoulder', 'right_shoulder', 'left_collar', 'right_collar', 'neck',
    'spine1', 'spine2', 'spine3',
    'left_knee', 'right_knee', 'left_ankle', 'right_ankle', 'left_foot', 'right_foot'
]

spatial_edges = [
    # Head cluster
    ('left_eye', 'head'), ('right_eye', 'head'), ('left_ear', 'head'),
    ('right_ear', 'head'), ('nose', 'head'), ('jaw', 'head'),

    # Left arm
    ('left_wrist', 'left_elbow'), ('left_elbow', 'left_shoulder'),
    ('left_shoulder', 'left_collar'),

    # Right arm
    ('right_wrist', 'right_elbow'), ('right_elbow', 'right_shoulder'),
    ('right_shoulder', 'right_collar'),

    # Spine
    ('spine1', 'spine2'), ('spine2', 'spine3'),

    # Pelvis/hip
    ('right_hip', 'pelvis'), ('left_hip', 'pelvis'), ('pelvis', 'spine3'),

    # Legs
    ('right_foot', 'right_ankle'), ('right_ankle', 'right_knee'), ('right_knee', 'right_hip'),
    ('left_foot', 'left_ankle'), ('left_ankle', 'left_knee'), ('left_knee', 'left_hip'),

    # Shoulder to spine
    ('right_collar', 'spine1'), ('left_collar', 'spine1'),

    # Head to spine
    ('head', 'spine1'),
]


def get_filtered_joint_list(exclude_groups: List) -> List:
    joint_groups = {
        "pelvic_joints": ['pelvis', 'left_hip', 'right_hip'],
        "arm_joints": ['left_elbow','right_elbow','left_wrist','right_wrist'],
        "head_joints": ['head','jaw','nose','right_eye','left_eye','right_ear','left_ear'],
        "thorax_joints": ['left_shoulder','right_shoulder','left_collar','right_collar','neck'],
        "leg_joints": ['left_knee','right_knee','left_ankle','right_ankle','left_foot','right_foot'],
        "spine_joints": ['spine1', 'spine2', 'spine3']
    }

    # Flatten group names into a set of excluded joint names
    excluded = set()
    for group in exclude_groups:
        excluded.update(joint_groups[group])

    # Final joint list after filtering
    final_joint_list = [j for j in joint_names if j not in excluded]
    return final_joint_list

def filter_edges(joint_list: List, spatial_edges: List) -> List:
    joint_set = set(joint_list)
    filtered_edges = [(a, b) for a, b in spatial_edges if a in joint_set and b in joint_set]
    return filtered_edges

def build_edge_list(joint_list: List, spatial_edges: List, num_frames=150):
    joint_idx = {name: i for i, name in enumerate(joint_list)}
    N = len(joint_list)
    total_nodes = N * num_frames

    rows, cols = [], []

    for t in range(num_frames):
        offset = t * N

        # spatial connections within the frame
        for a, b in spatial_edges:
            i, j = joint_idx[a] + offset, joint_idx[b] + offset
            rows += [i, j]
            cols += [j, i]

        # temporal connections between same joints across frames
        if t < num_frames - 1:
            next_offset = (t + 1) * N
            for i in range(N):
                rows += [offset + i, next_offset + i]
                cols += [next_offset + i, offset + i]

    return torch.tensor([rows, cols], dtype = torch.long)

def build_weighted_edge_list(joint_list, spatial_edges, num_frames = 150):
    joint_idx = {name: i for i, name in enumerate(joint_list)}
    N = len(joint_list)
    total_nodes = N * num_frames
    edge_weights = torch.tensor([], dtype=torch.long)
    
    rows, cols = [], []
    
    for t in range(num_frames):
        offset = t * N

        for a, b in spatial_edges:
            i, j = joint_idx[a] + offset, joint_idx[b] + offset
            rows += [i, j]
            cols += [j, i]
            add_weights = torch.tensor([[1,0], [1,0]])
            edge_weights = torch.cat([edge_weights, add_weights], dim=0)
            
        
        if t < num_frames - 1:
            next_offset = (t + 1) * N
            for i in range(N):
                rows += [offset + i, next_offset + i]
                cols += [next_offset + i, offset + i]
                add_weights = torch.tensor([[0,1], [0,1]])
                edge_weights = torch.cat([edge_weights, add_weights], dim=0)
            
        
    return torch.tensor([rows, cols], dtype= torch.long), edge_weights
        
    
    

In [39]:
# Test 
frame = data['train'][0][0].reshape(-1, 28, 3)
included_joints = get_filtered_joint_list(exclude_groups=['head_joints', 'thorax_joints', 'leg_joints', 'spine_joints'])
filtered_edges = filter_edges(included_joints, spatial_edges)
distance, wrist, elbow = feature_engineering_avg(frame, included_joints)
adj, edge_feat = build_weighted_edge_list(included_joints, filtered_edges, num_frames=3)

print(included_joints)
print(distance)
print(wrist)
print(elbow)

['pelvis', 'left_hip', 'right_hip', 'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist']
0.060586313053780164
0.12035102879787271
0.406564613109361


In [42]:
def build_node_list(exluded_groups : List, frames: np.array) -> np.array:
     joints_list = get_filtered_joint_list(exluded_groups)
     joint_indices = [MAIN_JOINTS.index(joint) for joint in joints_list]
     reshaped_frames = reshape_joints(frames)
     return reshaped_frames[:, joint_indices].reshape(-1,3)

     

In [43]:
#test
frames = read_pickle("action_dataset_joints_leg_sampled_150.pkl")
frames['train'][0]
data = build_node_list([], frames['train'][0][0])
reshaped_frame = reshape_joints(frames['train'][0][0])

#should be true
print(reshaped_frame.shape == data.shape)
print(data.shape)


False
(4200, 3)


  return super().find_class(module, name )


In [41]:
print(adj.max())

tensor(4199)


In [8]:
print((frames['train'][0][0]))

NameError: name 'frames' is not defined

In [38]:

def feature_engineering(training_frames, joint_list, wrist_threshold, elbow_threshold):
    
    distance_traveled = 0
    hand_wrist_distance = 0
    arm_joints = ['left_elbow','right_elbow','left_wrist','right_wrist']
    
    
    if 'pelvis' in joint_list:
        pelvis_all = training_frames[:, joint_names.index('pelvis')]  
        distance_traveled = np.linalg.norm(pelvis_all[1:] - pelvis_all[:-1], axis=1).sum()
   
    has_arm_joints = all(joint in joint_list for joint in arm_joints)
    
    if has_arm_joints:
        wrist_distance = np.linalg.norm(training_frames[:, joint_names.index('left_wrist')] - training_frames[:, joint_names.index('right_wrist')], axis=1)
        elbow_distance = np.linalg.norm(training_frames[:, joint_names.index('left_elbow')] - training_frames[:, joint_names.index('right_elbow')], axis=1)
        hand_wrist_distance = np.sum((wrist_distance < wrist_threshold) & (elbow_distance < elbow_threshold))
        
    return distance_traveled, hand_wrist_distance 


def feature_engineering_avg(training_frames, joint_list):
    
    distance_traveled = 0
    wrist_avg = 0
    elbow_avg = 0
    
    arm_joints = ['left_elbow','right_elbow','left_wrist','right_wrist']
    
    
    if 'pelvis' in joint_list:
        pelvis_all = training_frames[:, joint_names.index('pelvis')]  
        distance_traveled = np.linalg.norm(pelvis_all[1:] - pelvis_all[:-1], axis=1).sum()
   
    has_arm_joints = all(joint in joint_list for joint in arm_joints)
    
    if has_arm_joints:
        wrist_distance = np.linalg.norm(training_frames[:, joint_names.index('left_wrist')] - training_frames[:, joint_names.index('right_wrist')], axis=1)
        elbow_distance = np.linalg.norm(training_frames[:, joint_names.index('left_elbow')] - training_frames[:, joint_names.index('right_elbow')], axis=1)
        wrist_avg, elbow_avg = wrist_distance.mean(), elbow_distance.mean()
        
    return distance_traveled, wrist_avg, elbow_avg