In [None]:
import numpy as np
import os

motion_dir = './Motion'
annotation_dir = './Annotations'
llm_type = 'llama2'
save_dir = '../'

save_filename_motion = '../dataset_motion_smplID_20fps_avasplit.npz'
save_filename_text_high = '../dataset_text_high_avasplit.npz'
save_filename_text_low = '../dataset_text_low_avasplit.npz'

train_info = 'train_ava.txt'
test_info = 'test_ava.txt'
with open(train_info, 'r') as f:
    train_filenames = f.readlines()

with open(test_info, 'r') as f:
    test_filenames = f.readlines()


Motion (SMPL, 20 FPS)

In [None]:
""" 
SMPL (HumanML3D) 
"""
SMPL_JOINT_ORDER = [
'Root',
'Left Hip', 'Right Hip',
'Spine1',
'Left Knee','Right Knee',
'Spine2',
'Left Ankle', 'Right Ankle',
'Spine3',
'Left Foot', 'Right Foot',
'Neck', 
'Left Collar', 'Right Collar',
'Head',
'Left Shoulder', 'Right Shoulder',
'Left Elbow', 'Right Elbow',
'Left Wrist', 'Right Wrist',
'Prop'
]


""" 
XSENS (BlindWays)
"""
XSENS_JOINT_ORDER = [
'Pelvis',
'L5','L3','T12', 'T8',
'Neck','Head',
'Right Shoulder', 'Right Upper Arm', 'Right Forearm', 'Right Hand',
'Left Shoulder', 'Left Upper Arm', 'Left Forearm', 'Left Hand',
'Right Upper Leg', 'Right Lower Leg', 'Right Foot', 'Right Toe',
'Left Upper Leg', 'Left Lower Leg', 'Left Foot', 'Left Toe', 
'Prop'
]

""" 
XSENS to SMPL
"""
XSENS_to_SMPL = [
'L5',
'Left Upper Leg', 'Right Upper Leg',
'L3',
'Left Lower Leg', 'Right Lower Leg',
'T12',
'Left Foot', 'Right Foot',
'T8',
'Left Toe', 'Right Toe',
'Neck',
'Left Shoulder', 'Right Shoulder',
'Head',
'Left Upper Arm', 'Right Upper Arm',
'Left Forearm', 'Right Forearm',
'Left Hand','Right Hand',
'Prop'
]

def xsens_to_smpl_seq(motion):
    
    num_smpl_joints = len(SMPL_JOINT_ORDER) 
    motion_xsens_to_smpl = np.empty([motion.shape[0], num_smpl_joints, motion.shape[2]])
    
    if len(XSENS_to_SMPL) != num_smpl_joints:
        assert 0
    
    for i in range(num_smpl_joints):
        corr_joint_name = XSENS_to_SMPL[i]        
        corr_idx = XSENS_JOINT_ORDER.index(corr_joint_name)
        motion_xsens_to_smpl[:,i,:] = motion[:, corr_idx, :]
    
    return motion_xsens_to_smpl



In [None]:
train_dataset_smpl_20fps = {}
for seq_name in train_filenames:
    seq_name = seq_name.strip()  
    
    # if not os.path.exists(f'{motion_dir}/{seq_name}.npy'):
    #     print(seq_name)
    
    data = np.load(f'{motion_dir}/{seq_name}.npy') # xsens, 60 fps: 600 x 24 x3
    data_20fps = data[::3]  # xsens, 20 fps: 200 x 24 x3
    data_20fps_smpl = xsens_to_smpl_seq(data_20fps)
    
    train_dataset_smpl_20fps.update({seq_name:data_20fps_smpl})
    
test_dataset_smpl_20fps = {}
for seq_name in test_filenames:
    seq_name = seq_name.strip()  
    
    # if not os.path.exists(f'{motion_dir}/{seq_name}.npy'):
    #     print(seq_name)
    
    data = np.load(f'{motion_dir}/{seq_name}.npy') # xsens, 60 fps: 600 x 24 x3
    data_20fps = data[::3]  # xsens, 20 fps: 200 x 24 x3
    data_20fps_smpl = xsens_to_smpl_seq(data_20fps)
    
    test_dataset_smpl_20fps.update({seq_name:data_20fps_smpl})
    

Text Embedding - llama2

In [None]:
import ollama
import time
import json

train_dataset_text_high = {}
train_dataset_text_low = {}
for seq_name in train_filenames:
    seq_name = seq_name.strip()  
    
    with open(f'{annotation_dir}/{seq_name}.json', 'r') as file: 
        anno = json.load(file)
        
    anno_high = anno['highlevel']
    anno_low = anno['lowlevel']

    c_embedding_high = np.array(ollama.embeddings(model=llm_type, prompt=anno_high)['embedding'], dtype=np.float64) # embedding size: (4096, )
    c_embedding_low = np.array(ollama.embeddings(model=llm_type, prompt=anno_low)['embedding'], dtype=np.float64) # embedding size: (4096, )

    train_dataset_text_high.update({seq_name:c_embedding_high})
    train_dataset_text_low.update({seq_name:c_embedding_low})
    
    
test_dataset_text_high = {}
test_dataset_text_low = {}
for seq_name in test_filenames:
    seq_name = seq_name.strip()  
    
    with open(f'{annotation_dir}/{seq_name}.json', 'r') as file: 
        anno = json.load(file)
        
    anno_high = anno['highlevel']
    anno_low = anno['lowlevel']

    c_embedding_high = np.array(ollama.embeddings(model=llm_type, prompt=anno_high)['embedding'], dtype=np.float64) # embedding size: (4096, )
    c_embedding_low = np.array(ollama.embeddings(model=llm_type, prompt=anno_low)['embedding'], dtype=np.float64) # embedding size: (4096, )

    test_dataset_text_high.update({seq_name:c_embedding_high})
    test_dataset_text_low.update({seq_name:c_embedding_low})

In [None]:

final_data_text_high = {}
final_data_text_low = {}
final_data_skel = {}

final_data_text_high.update({'data':{'s_train':train_dataset_text_high, 's_test':test_dataset_text_high}})
final_data_text_low.update({'data':{'s_train':train_dataset_text_low, 's_test':test_dataset_text_low}})
final_data_skel.update({'data':{'s_train':train_dataset_smpl_20fps, 's_test':test_dataset_smpl_20fps}})

np.savez(save_filename_text_high, **final_data_text_high)
np.savez(save_filename_text_low, **final_data_text_low)
np.savez(save_filename_motion, **final_data_skel)


Visualize Data (Sanity Check) - SMPL Joint Representation

In [None]:
import matplotlib.pyplot as plt
import matplotlib as mpl
import matplotlib.gridspec as gridspec
import subprocess

HAND_DICT = {
    'P01': 'right',
    'P02': 'right',
    'P03': 'right',
    'P04': 'right',
    'P05': 'left',
    'P06': 'right',
    'P07': 'right',
    'P08': 'right',
    'P09': 'right',
    'P10': 'left',
}

SMPL_JOINT_ORDER = [
    "Root",
    "Left Hip",
    "Right Hip",
    "Spine1",
    "Left Knee",
    "Right Knee",
    "Spine2",
    "Left Ankle",
    "Right Ankle",
    "Spine3",
    "Left Foot",
    "Right Foot",
    "Neck",
    "Left Collar",
    "Right Collar",
    "Head",
    "Left Shoulder",
    "Right Shoulder",
    "Left Elbow",
    "Right Elbow",
    "Left Wrist",
    "Right Wrist",
    "Prop"
]

SMPL_JOINT_PAIRS_LeftHanded = [
    ("left", "Root", "Left Hip"),
    ("right","Root", "Right Hip"),
    ("center","Root", "Spine1"),
    ("center","Spine1", "Spine2"),
    ("center","Spine2", "Spine3"),
    ("center","Spine3", "Neck"),
    ("center","Neck", "Head"),
    ("left","Left Collar", "Left Shoulder"),
    ("right","Right Collar", "Right Shoulder"),
    ("left","Left Shoulder", "Left Elbow"),
    ("right","Right Shoulder", "Right Elbow"),
    ("left","Left Elbow", "Left Wrist"),
    ("right","Right Elbow", "Right Wrist"),
    ("left","Left Hip", "Left Knee"),
    ("right","Right Hip", "Right Knee"),
    ("left","Left Knee", "Left Ankle"),
    ("right","Right Knee", "Right Ankle"),
    ("left","Left Ankle", "Left Foot"),
    ("right","Right Ankle", "Right Foot"),
    ("left","Spine3", "Left Collar"),
    ("right","Spine3", "Right Collar"),
    ('cane', 'Left Wrist', 'Prop'),
]

SMPL_JOINT_PAIRS_RightHanded = [
    ("left", "Root", "Left Hip"),
    ("right","Root", "Right Hip"),
    ("center","Root", "Spine1"),
    ("center","Spine1", "Spine2"),
    ("center","Spine2", "Spine3"),
    ("center","Spine3", "Neck"),
    ("center","Neck", "Head"),
    ("left","Left Collar", "Left Shoulder"),
    ("right","Right Collar", "Right Shoulder"),
    ("left","Left Shoulder", "Left Elbow"),
    ("right","Right Shoulder", "Right Elbow"),
    ("left","Left Elbow", "Left Wrist"),
    ("right","Right Elbow", "Right Wrist"),
    ("left","Left Hip", "Left Knee"),
    ("right","Right Hip", "Right Knee"),
    ("left","Left Knee", "Left Ankle"),
    ("right","Right Knee", "Right Ankle"),
    ("left","Left Ankle", "Left Foot"),
    ("right","Right Ankle", "Right Foot"),
    ("left","Spine3", "Left Collar"),
    ("right","Spine3", "Right Collar"), 
    ('cane', 'Right Wrist', 'Prop'), 
]

CANE_AVG_LENGTH = 0.7
def get_joint_pair_between_cane_and_ground(motion, joint_order, joint_pairs, hand_side):

    motion = motion.copy()  # Ensure we don't modify the original motion array
    joint_order = joint_order.copy()
    joint_pairs = joint_pairs.copy()  # Ensure we don't modify the original joint_pairs

    if hand_side == 'right':
        
        last_joint = motion[:, joint_order.index("Prop")]
        last_2_joint = motion[:, joint_order.index("Right Wrist")]
        direction = last_joint - last_2_joint
        direction = direction / np.linalg.norm(direction, axis=1)[:, None] * CANE_AVG_LENGTH
        new_joint = last_joint + direction
        motion = np.concatenate((motion, new_joint[:, None, :]), axis=1)
        joint_pairs.append(('cane', 'Right Wrist', 'Prop'))
        joint_pairs.append(('cane', 'Prop', 'Prop_extended'))
        joint_order.append('Prop_extended')
        
    else:  # if hand_side == 'left'
        
        last_joint = motion[:, joint_order.index("Prop")]
        last_2_joint = motion[:, joint_order.index("Left Wrist")]
        direction = last_joint - last_2_joint
        direction = direction / np.linalg.norm(direction, axis=1)[:, None] * CANE_AVG_LENGTH
        new_joint = last_joint + direction
        motion = np.concatenate((motion, new_joint[:, None, :]), axis=1)
        joint_pairs.append(('cane', 'Left Wrist', 'Prop'))
        joint_pairs.append(('cane', 'Prop', 'Prop_extended'))
        joint_order.append('Prop_extended')
        
    return motion, joint_order, joint_pairs

COLOR_DICT = {'Left': '#DB9CC6', 'Center': 'k', 'Right': '#B6508C', 'Cane' : 'w'}
def PLOT_CARLA_3D_FINAL(SAVE_DIR, kpts3d, SMPL_JOINT_ORDER_w_cane_extended, SMPL_JOINT_PAIRS_w_cane_extended):
    LColor = COLOR_DICT['Left']
    RColor = COLOR_DICT['Right']
    CColor = COLOR_DICT['Cane']

    fig = plt.figure(facecolor='black')
    ax = fig.add_subplot(111, projection='3d', facecolor='black') 
    ax = plt.subplot(projection='3d', facecolor='black')

    for idx, (side, sname, ename) in enumerate(SMPL_JOINT_PAIRS_w_cane_extended):
        sidx = SMPL_JOINT_ORDER_w_cane_extended.index(sname)
        eidx = SMPL_JOINT_ORDER_w_cane_extended.index(ename)

        sjoint = kpts3d[sidx]
        ejoint = kpts3d[eidx]

        rpts = np.stack((sjoint, ejoint), axis=1)

        if side == 'right':
            tcolor = RColor
        elif side == 'cane':
            tcolor = CColor
        else:
            tcolor = RColor
            
        ax.plot(rpts[0, :], rpts[1, :], rpts[2, :], '-', color=tcolor, alpha=0.95, lw=6, solid_capstyle="round", solid_joinstyle="round")

    xlim = ax.get_xlim3d()
    ylim = ax.get_ylim3d()
    zlim = ax.get_zlim3d()

    ax.set_xticks([])
    ax.set_yticks([])
    ax.set_zticks([])

    # Get rid of the lines and plane
    white = (1.0, 1.0, 1.0, 0.0)
    ax.xaxis.set_pane_color(white)
    ax.yaxis.set_pane_color(white)
    ax.zaxis.set_pane_color(white)

    ax.xaxis.line.set_color(white)
    ax.yaxis.line.set_color(white)
    ax.zaxis.line.set_color(white)


    # Remove the grid and set the background color
    ax.grid(False)
    ax.set_facecolor('black')
    fig.patch.set_facecolor('black')
    ax.set_box_aspect((xlim[1]-xlim[0], ylim[1]-ylim[0], zlim[1]-zlim[0]))
    plt.tight_layout(pad=0)
    
    plt.savefig(SAVE_DIR, bbox_inches='tight', pad_inches=0, dpi=500)
    plt.close()
                
                
def save_video(input_file_name, output_file_name):
    # Ensure the input is a glob pattern or a single file
    command = [
        'ffmpeg',
        '-y',  # Overwrite output files without asking
        '-pattern_type', 'glob',
        '-i', input_file_name,  # Directly pass the filename variable
        '-vf', "scale=trunc(iw/2)*2:trunc(ih/2)*2,fps=20",  # Video filter for scale and fps
        '-pix_fmt', 'yuv420p',  # Pixel format for compatibility
        output_file_name
    ]
    
    try:
        # Execute the FFmpeg command
        subprocess.run(command, check=True)
        print(f"Video is saved as '{output_file_name}'")
    except subprocess.CalledProcessError as e:
        print(f"FFmpeg failed to process the file: {e}")
    except Exception as e:
        print(f"An error occurred: {e}")

In [None]:
sample_data_key = 'P08_0139'
sample_data = test_dataset_smpl_20fps[sample_data_key]

In [None]:
os.makedirs(f'sanity_check/{sample_data_key}', exist_ok=True)

SMPL_JOINT_ORDER_w_cane_extended = []
SMPL_JOINT_PAIRS_w_cane_extended = []
if ('P05' in sample_data_key) or ('P10' in sample_data_key): # left-handed
    motion_w_cane_extended_gt, SMPL_JOINT_ORDER_w_cane_extended_lhanded, SMPL_JOINT_PAIRS_w_cane_extended_lhanded = get_joint_pair_between_cane_and_ground(sample_data, SMPL_JOINT_ORDER, SMPL_JOINT_PAIRS_LeftHanded, 'left')

    SMPL_JOINT_ORDER_w_cane_extended = SMPL_JOINT_ORDER_w_cane_extended_lhanded
    SMPL_JOINT_PAIRS_w_cane_extended = SMPL_JOINT_PAIRS_w_cane_extended_lhanded

else:
    motion_w_cane_extended_gt, SMPL_JOINT_ORDER_w_cane_extended_rhanded, SMPL_JOINT_PAIRS_w_cane_extended_rhanded = get_joint_pair_between_cane_and_ground(sample_data, SMPL_JOINT_ORDER, SMPL_JOINT_PAIRS_RightHanded, 'right')

    SMPL_JOINT_ORDER_w_cane_extended = SMPL_JOINT_ORDER_w_cane_extended_rhanded
    SMPL_JOINT_PAIRS_w_cane_extended = SMPL_JOINT_PAIRS_w_cane_extended_rhanded

# visualize gt
for t in range(motion_w_cane_extended_gt.shape[0]):
    kpts = motion_w_cane_extended_gt[t]
    PLOT_CARLA_3D_FINAL(f'sanity_check/{sample_data_key}/{t:012d}.jpg', kpts, SMPL_JOINT_ORDER_w_cane_extended, SMPL_JOINT_PAIRS_w_cane_extended)
save_video(f'sanity_check/{sample_data_key}/*.jpg', f'sanity_check/{sample_data_key}.mp4')
    