In [13]:
import json
import re
import os
from pathlib import Path

class fubot_config():
    class feat_group():
        class feat_type():
            lut_C = {'w': 1, 'x':2, 'y':3, 'z':4}
            lut_C_inv = list(lut_C.keys())
            lut_S = {'l': 1, 'w':2}
            lut_T = {'pos': 1, 'qrot':2, 'rot':3}

            def __init__(self, S, T, C):              
                self.S = self.lut_S[S]
                self.T = self.lut_T[T]
                self.C = self.lut_C[C]
                self.C_id = self.C - (1 if self.T == 2 else 2)
                self.valid = (self.S * self.T * self.C) > 0

            def getValue(self, v_local, v_world):
                return v_local[self.C_id] if self.S == 1 else v_world[self.C_id]

            def getPostFix(self):
                pt = 'p' if self.T == 1 else 'r'
                pc = self.lut_C_inv[self.C - 1]
                
                return f'_{pt}{pc}'

        def __init__(self, feature_config):
            self.name = None
            self.bone_name = None
            self.elements = []
            self.size = 0
            self.rgx_el = re.compile(r'(.+)\[(.+)\]_(.)')
            self.__parse__(feature_config)

        def __parse__(self, feature_config):
            self.name = feature_config[0]
            self.bone_name = feature_config[1] if len(feature_config[1]) > 0 else self.name

            #Feature Elements
            for f_el in feature_config[2]:
                m = self.rgx_el.match(f_el)
                g = m.groups()
                if len(g) is not 3:
                    print(f'Invalid feature element found. ({self.name} >> {f_el})')
                    continue

                for comp in g[1]:
                    ft = self.feat_type(g[2], g[0], comp)
                    if not ft.valid:
                        print(f'Invalid feature element found. ({self.name} >> {f_el} [{g[2]}, {g[0]}, {comp}])')
                        continue

                    self.elements.append(ft)
            
            self.size = len(self.elements)

    def __init__(self, config_path):
        self.features_output:self.feat_group = []
        self.features_output_header = []
        self.features_output_size = 0

        self.features_input:self.feat_group = []
        self.features_input_header = []
        self.features_input_size = 0
        self.params = None

        self.config_path = config_path

        self.__loadConfig__(config_path)

    def __loadConfig__(self, config_path):
        with open(config_path) as f:
            self.params = json.loads(f.read())

        #Training Features
        #input
        features = self.params['training']['input_features']
        if features is not None:
            for feat in features:
                fg = self.feat_group(feat)
                self.features_input.append(fg)
                self.features_input_size += fg.size

                #Create Header
                for ft in fg.elements:
                    self.features_input_header.append(f'{fg.bone_name}{ft.getPostFix()}')

                

        #output
        features = self.params['training']['output_features']
        if features is not None:
            for feat in features:
                fg = self.feat_group(feat)
                self.features_output.append(fg)
                self.features_output_size += fg.size

                #Create Header
                for ft in fg.elements:
                    self.features_output_header.append(f'{fg.bone_name}{ft.getPostFix()}')

    def set_working_directory(self):
        p = Path(self.config_path)
        if not os.path.exists(p.parent):
            return 
        
        if not os.path.samefile(p.parent, os.getcwd()):
            os.chdir(p.parent)

config = fubot_config('../samples/config_default6p.json')
print(config.params)

{'fubot_root': '../../', 'output_root': 'output', 'output_name': 'dataset_6p', 'split': {'categories': ['train', 'val', 'test'], 'ratios': [0.65, 0.25, 0.1], 'min_chunk_size': 30, 'max_chunk_size': 300, 'seed': 0}, 'training': {'input_features': [['Head', '', ['pos[xyz]_w', 'qrot[wxyz]_w']], ['Hips', '', ['pos[xyz]_w', 'qrot[wxyz]_w']], ['LeftHand', '', ['pos[xyz]_w', 'qrot[wxyz]_w']], ['RightHand', '', ['pos[xyz]_w', 'qrot[wxyz]_w']], ['LeftToes', '', ['pos[xyz]_w', 'qrot[wxyz]_w']], ['RightToes', '', ['pos[xyz]_w', 'qrot[wxyz]_w']]], 'output_features': [['Hips', '', ['pos[xyz]_l', 'qrot[wxyz]_l']], ['Spine', '', ['qrot[wxyz]_l']], ['Chest', '', ['qrot[wxyz]_l']], ['UpperChest', '', ['qrot[wxyz]_l']], ['Neck', '', ['qrot[wxyz]_l']], ['Head', '', ['qrot[wxyz]_l']], ['LeftShoulder', '', ['qrot[wxyz]_l']], ['LeftUpperArm', '', ['qrot[wxyz]_l']], ['LeftLowerArm', '', ['qrot[wxyz]_l']], ['LeftHand', '', ['qrot[wxyz]_l']], ['RightShoulder', '', ['qrot[wxyz]_l']], ['RightUpperArm', '', ['qro

In [2]:
from collections import OrderedDict
from bvhtoolbox import Bvh
import transforms3d as t3d
from math import radians
import numpy as np
import pandas as pd
import os

class bvh_skeleton_node:
    def __init__(self, name, offset):
        self.children = []
        self.parent:bvh_skeleton_node = None

        self.name = name
        self.local_position = offset
        self.local_rotation_mat = None
        self.local_rotation_quat = [0,0,0,0]
        self.local_transform = None

        self.world_position = None
        self.world_rotation_mat = None
        self.world_rotation_quat = None
        self.world_transform = None

    def CalculateWorld(self):
        self.local_transform = t3d.affines.compose(self.local_position, self.local_rotation_mat, [1,1,1])

        if self.parent != None:
            self.world_transform = self.parent.world_transform.dot(self.local_transform)
        else:
            self.world_transform = self.local_transform

        self.world_position, self.world_rotation_mat, _, _ = t3d.affines.decompose(self.world_transform)
        self.world_rotation_quat = t3d.quaternions.mat2quat(self.world_rotation_mat)

        for child in self.children:
            child.CalculateWorld()

    def SetLocalRotation(self, new_rotation):
        self.local_rotation_quat = new_rotation
        self.UpdateLocalRotation()

    def UpdateLocalRotation(self):
        self.local_rotation_mat = t3d.quaternions.quat2mat(self.local_rotation_quat)

    def AddChild(self, child):
        self.children.append(child)
        child.parent = self
        return child #Enables chaining

    def print_tree(self):
        if self.parent is None:
            print(f'{self.name} : ROOT ')
        else:
            print(f'{self.name} : {self.parent.name}')

        print(f'\t{self.name}_world {{{self.world_position}, {self.world_rotation_quat}}}')
        print(f'\t{self.name}_local {{{self.local_position}, {self.local_rotation_quat}}}')

        for child in self.children:
            child.print_tree()

In [16]:
import glob
from tqdm.notebook import tqdm

class bvh_skeleton: 
    def __init__(self, bvh_fp = None):
        self.root = None
        self.nodes = OrderedDict()
        self.nodes_flat = []
        self.fp_bvh = bvh_fp
        self.bvh:Bvh = None

        self.load_skeleton(bvh_fp)

    def set_motion_source(self, bvh_fp):
        self.fp_bvh = bvh_fp
        with open(self.fp_bvh) as f:
            self.bvh = Bvh(f.read())

    def load_skeleton(self, bvh_fp):
        if bvh_fp is None:
            return

        self.fp_bvh = bvh_fp
        with open(self.fp_bvh) as f:
            self.bvh = Bvh(f.read())
        
        joint_names = self.bvh.get_joints_names() 
        skDef = OrderedDict()
        for joint_name in joint_names:
            j = self.bvh.get_joint(joint_name)
            j_parent = j.parent.value[1] if len(j.parent.value) > 1 else None
            j_offset = [float(val) for val in j['OFFSET']]
            skDef[joint_name] = (j_offset, j_parent)

        #Construct Skeleton
        self.nodes.clear()
        for key, value in skDef.items():
            value[0][0] = -value[0][0]
            new_node = bvh_skeleton_node(key, value[0])
            self.nodes[key] = new_node

            if value[1] != None:
                self.nodes[value[1]].AddChild(new_node)
            else:
                self.root = new_node

        self.nodes_flat = list(self.nodes.values())

    def __convertToUnityPos(self, v, y_up = True):
        if y_up:
            return v * [-1,1,1]

        return v * [-1,1,-1]

    def __convertToUnityQuat(self, e, y_up = True):
        qZ = t3d.quaternions.axangle2quat([0,0,1], radians(e[0]), True) #FORWARD/Z
        qX = t3d.quaternions.axangle2quat([1,0,0], radians(e[1]), True) #RIGHT/X
        qY = t3d.quaternions.axangle2quat([0,1,0], radians(e[2]), True) #UP/Y
        q = t3d.quaternions.qmult(t3d.quaternions.qmult(qZ , qX) , qY)
        #q /= t3dq.qnorm(q)

        if y_up:
            return q * [1,1,-1,-1]
        
        return q * [1,1,1,-1]

    def __getFeature(self, joint_name, ft:fubot_config.feat_group.feat_type):
        joint = self.nodes[joint_name]
        val = None

        if ft.T == 1: #POS
            return ft.getValue(joint.local_position, joint.world_position)
        if ft.T == 2: #QROT
            return ft.getValue(joint.local_rotation_quat, joint.world_rotation_quat)
        if ft.T == 3: #ROT (euler)
            print('Euler rotations not yet supported')
            return 0

    def set_motion_frame(self,index):
        frame = self.bvh.frames[index]
        
        #semi-hardcoded
        self.root.local_position = self.__convertToUnityPos(np.array(frame[0:3]).astype(np.float))

        for idx, n in enumerate(self.nodes_flat):
            i = 3 + (idx*3)
            q = self.__convertToUnityQuat(np.array(frame[i:i+3]).astype(np.float))
            n.SetLocalRotation(q)

        self.root.CalculateWorld()

    def extract_features(self, config:fubot_config):
        #Input Features
        features_input = np.empty(config.features_input_size)
        feature_id = 0
        for fg in config.features_input:
            for ft in fg.elements:
                features_input[feature_id] = self.__getFeature(fg.bone_name, ft)
                feature_id += 1

        #Output Features
        features_output = np.empty(config.features_output_size)
        feature_id = 0
        for fg in config.features_output:
            for ft in fg.elements:
                features_output[feature_id] = self.__getFeature(fg.bone_name, ft)
                feature_id += 1

        return features_input, features_output

    def convert_to_training_set(self, config:fubot_config, output_path, output_name):
        num_f_in = config.features_input_size
        num_f_out = config.features_output_size
        num_frames = len(self.bvh.frames)

        train_input = np.empty(shape=(num_frames, num_f_in))
        train_output = np.empty(shape=(num_frames, num_f_out))

        for i in range(num_frames):
            self.set_motion_frame(i)
            f_in, f_out = self.extract_features(config)

            train_input[i,:] = f_in
            train_output[i,:] = f_out
        
        pd.DataFrame(data=train_input, columns=config.features_input_header).to_csv(os.path.join(output_path, f'{output_name}_in.csv'), index=False, float_format='%.7f')   
        pd.DataFrame(data=train_output, columns=config.features_output_header).to_csv(os.path.join(output_path, f'{output_name}_out.csv'), index=False, float_format='%.7f')

        
def generate_trainingset(config: fubot_config):
    config.set_working_directory()

    input_root = config.params['output_root']
    ds_root = os.path.join(input_root, config.params['output_name'])
    
    #parse DS Metafile
    ds_meta_fp = os.path.join(ds_root, config.params['output_name']+'.json')
    with open(ds_meta_fp) as f:
        ds_meta = json.loads(f.read())

    pbar_game_id = tqdm(ds_meta['game_ids'])
    for game_id in pbar_game_id:
        game_name = game_id['name']
        pbar_game_id.set_description(f'Processing \'{game_name}\'')
        pbar_cat = tqdm(ds_meta['split_categories'])
        for cat in pbar_cat:
            pbar_cat.set_description(f'Category > \'{cat}\'')
            cat_path = os.path.join(ds_root, game_id['name'], cat)
            bvh_filter = os.path.join(cat_path, 'bvh', '*_b.bvh')
            training_output = os.path.join(cat_path, 'training')
            os.makedirs(training_output, exist_ok=True)

            skeleton = None
            pbar_sample = tqdm(glob.glob(bvh_filter))
            pbar_sample.set_description('BVH Chunks')
            for bvh_fp in pbar_sample:
                s = os.path.basename(bvh_fp).split('_')
                
                if skeleton is None:
                    skeleton = bvh_skeleton(bvh_fp)
                else:
                    skeleton.set_motion_source(bvh_fp)

                skeleton.convert_to_training_set(config, training_output ,f'{s[0]}_{s[1]}_{s[2]}')





# skeleton = bvh_skeleton('../samples/beatsaber_0_1_b.bvh')
# config = fubot_config('../samples/generator_config.json')

# skeleton.set_motion_frame(0)
# skeleton.convert_to_training_set(config, '', 'temp')

config = fubot_config('../samples/config_default6p.json')
generate_trainingset(config)

HBox(children=(FloatProgress(value=0.0, max=5.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=3.0), HTML(value='')))

HBox(children=(FloatProgress(value=0.0, max=186.0), HTML(value='')))






KeyboardInterrupt: 

In [40]:
#rename files
import glob

def run_temp():
    root_directory = 'D:\_GITHUB\MGT_204354\DATASET\dataset_3p'
    for walk_dir in os.walk(root_directory):
        for child_dir in walk_dir[1]:
            if child_dir == 'bvh':
                target_dir = os.path.join(walk_dir[0], child_dir)
                csv_files = glob.glob(target_dir + '/*_b.bvh')
                csv_files = sorted(csv_files, key=lambda x: int(str(os.path.basename(x).split('_')[1]) + str(os.path.basename(x).split('_')[2]).zfill(2)), reverse=True)
                for x in csv_files:
                    b = os.path.basename(x).split('_')
                    baseNew = f'{b[0]}_{b[1]}_{int(b[2])+1}_{b[3]}'
                    x_new = os.path.join(os.path.dirname(x),baseNew)
                    
                    #print(f'{x} >> {x_new}')
                    os.rename(x, x_new)

            elif child_dir == 'training':
                target_dir = os.path.join(walk_dir[0], child_dir)
                csv_files = glob.glob(target_dir + '/*.csv')
                csv_files = sorted(csv_files, key=lambda x: int(str(os.path.basename(x).split('_')[1]) + str(os.path.basename(x).split('_')[2]).zfill(2)), reverse=True)
                for x in csv_files:
                    b = os.path.basename(x).split('_')
                    baseNew = f'{b[0]}_{b[1]}_{int(b[2])+1}_{b[3]}'
                    x_new = os.path.join(os.path.dirname(x),baseNew)
                    
                    #print(f'{x} >> {x_new}')
                    os.rename(x, x_new)

run_temp()            