In [1]:
%load_ext autoreload
%autoreload 2

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import skeletonDefMixamo as skd
from torch.utils.data import TensorDataset, DataLoader
from tqdm import tqdm

from PhaseFunctionedNetwork import PhaseFunctionedNetwork

# set seeds for reproduceability
torch.manual_seed(42)
np.random.seed(42)
rng = np.random.RandomState(42)

In [2]:
# load data
# X = np.float32(np.loadtxt('./data/Input.txt'))
# Y = np.float32(np.loadtxt('./data/Output.txt'))
# P = np.float32(np.loadtxt('./data/Phases.txt'))
# print(X.shape, Y.shape, P.shape)

In [3]:
def get_phases(X_input_arr):
    phase = np.linspace(0, 2*np.pi, num=X_input_arr.shape[0])
    return phase

In [4]:
# Process Data (Note currently this data is NOT MIRRORD, CAN DOUBLE WITH MIRRORING)

# Idle Data
X_idle = np.float32(np.loadtxt('C:/Users/Ana/Desktop/dev/pfnn-dev/Export/Input_Idle.txt'))
Y_idle = np.float32(np.loadtxt('C:/Users/Ana/Desktop/dev/pfnn-dev/Export/Output_Idle.txt'))
P_idle = get_phases(X_idle)
print(f"Idle data shape: {X_idle.shape}, {Y_idle.shape}, {P_idle.shape}")

# Big Forward Jump
X_jump_for_BIG = np.float32(np.loadtxt('C:/Users/Ana/Desktop/dev/pfnn-dev/Export/Input_jump_for_BIG.txt'))
Y_jump_for_BIG = np.float32(np.loadtxt('C:/Users/Ana/Desktop/dev/pfnn-dev/Export/Output_jump_for_BIG.txt'))
P_jump_for_BIG = get_phases(X_jump_for_BIG)
print(f"Jump Forward BIG data shape: {X_jump_for_BIG.shape}, {Y_jump_for_BIG.shape}, {P_jump_for_BIG.shape}")

# Small Forward Jump
X_jump_for_SMALL = np.float32(np.loadtxt('C:/Users/Ana/Desktop/dev/pfnn-dev/Export/Input_jump_for_SMALL.txt'))
Y_jump_for_SMALL = np.float32(np.loadtxt('C:/Users/Ana/Desktop/dev/pfnn-dev/Export/Output_jump_for_SMALL.txt'))
P_jump_for_SMALL = get_phases(X_jump_for_SMALL)
print(f"Jump Forward SMALL data shape: {X_jump_for_SMALL.shape}, {Y_jump_for_SMALL.shape}, {P_jump_for_SMALL.shape}")

Idle data shape: (995, 888), (995, 819), (995,)
Jump Forward BIG data shape: (169, 888), (169, 819), (169,)
Jump Forward SMALL data shape: (112, 888), (112, 819), (112,)


In [5]:
# Concatenate Data into indivudal X, Y, P Arrays
X = np.concatenate((X_idle, X_jump_for_BIG, X_jump_for_SMALL))
print(f"X: {X.shape}")

Y = np.concatenate((Y_idle, Y_jump_for_BIG, Y_jump_for_SMALL))
print(f"Y: {Y.shape}")

P = np.concatenate((P_idle, P_jump_for_BIG, P_jump_for_SMALL))
print(f"P: {P.shape}")

# Debugging only use idle
# X = X_idle
# Y = Y_idle
# P = P_idle

X: (1276, 888)
Y: (1276, 819)
P: (1276,)


In [6]:
# calculate mean and std
Xmean, Xstd = X.mean(axis=0), X.std(axis=0)
Ymean, Ystd = Y.mean(axis=0), Y.std(axis=0)

print(f"X mean: {Xmean.shape}")
print(f"X std: {Xstd.shape}")
print(f"Y mean: {Ymean.shape}")
print(f"Y std: {Ystd.shape}")

X mean: (888,)
X std: (888,)
Y mean: (819,)
Y std: (819,)


In [7]:
def contains_any(num_list, str):
    return any(num in str for num in num_list)

In [8]:
# Parser of Labels
labels_path = "C:/Users/Ana/Desktop/dev/pfnn-dev/Export/InputLabels_Idle.txt"

TRAJ_STR = 'Trajectory'
POS_STR = 'Position'
DIR_STR = 'Direction'
GAIT_STR = 'Style'
SPEED_STR = 'Speed'
VEL_STR = 'Velocity'
JOINT_STR = 'Bone'
FORWARD_STR = 'Forward'
UP_STR = 'Up'

with open(labels_path, 'r') as file:
    lines = file.readlines()

past_indices = [str(i) for i in range(0,6)]

# All the index arrays to populate for calculating means/std

# Trajectory
traj_past_pos_idx = []
traj_future_pos_idx = []
traj_past_dir_idx = []
traj_future_dir_idx = []
traj_past_vel_idx = []
traj_future_vel_idx = []
traj_gait_idx = []
traj_speed_idx = []

# Joints
joint_pos = []
joint_forward = []
joint_up = []
joint_vel = []

for line in lines:
    # Split the line at the first space character to separate index and name
    index, name = line.split(maxsplit=1)
    index = int(index.strip('[]'))  # Remove the square brackets and convert to int
    name = name.strip()  # Remove any surrounding whitespace/newline characters
    
    # Trajectory parsing logic
    if TRAJ_STR in name:
        if POS_STR in name:
            if contains_any(past_indices, name):
                # Past Positions
                traj_past_pos_idx.append(index)
            else:
                # Future Positions
                traj_future_pos_idx.append(index)
        if DIR_STR in name:
            if contains_any(past_indices, name):
                # Past Directions
                traj_past_dir_idx.append(index)
            else:
                # Future Directions
                traj_future_dir_idx.append(index)
        if VEL_STR in name:
            if contains_any(past_indices, name):
                # Past Velocities
                traj_past_vel_idx.append(index)
            else:
                # Future Velocities
                traj_future_vel_idx.append(index)
        if GAIT_STR in name:
            # Gait
            traj_gait_idx.append(index)
        if SPEED_STR in name:
            # Speed
            traj_speed_idx.append(index)
    if JOINT_STR in name:
        if POS_STR in name:
            # Positions X, Y and Z
            joint_pos.append(index)
        if FORWARD_STR in name:
            # Forward translation X, Y and Z
            joint_forward.append(index)
        if UP_STR in name:
            # Upward translations X, Y and Z
            joint_up.append(index)
        if VEL_STR in name:
            # Velocity X, Y and Z
            joint_vel.append(index)

# Debugging purposes
# print(traj_past_pos_idx)
# print(traj_future_pos_idx)
# print(traj_past_dir_idx)
# print(traj_future_dir_idx)
# print(traj_gait_idx)
# print(traj_speed_idx)
# print(traj_past_vel_idx)
# print(traj_future_vel_idx)
# print(joint_pos)
# print(joint_forward)
# print(joint_up)

In [9]:
# Preprocess Input Data
Xstd[traj_past_pos_idx] = Xstd[traj_past_pos_idx].mean()  # Trajectory Past Positions
Xstd[traj_future_pos_idx] = Xstd[traj_future_pos_idx].mean()  # Trajectory Future Positions
Xstd[traj_past_dir_idx] = Xstd[traj_past_dir_idx].mean()  # Trajectory Past Directions
Xstd[traj_future_dir_idx] = Xstd[traj_future_dir_idx].mean()  # Trajectory Future Direction
Xstd[traj_past_vel_idx] = Xstd[traj_past_vel_idx].mean()  # Trajectory Past Velocities
Xstd[traj_future_vel_idx] = Xstd[traj_future_vel_idx].mean()  # Trajectory Future Velocities
Xstd[traj_speed_idx] = Xstd[traj_speed_idx].mean()  # Trajectory Speed
Xstd[traj_gait_idx] = Xstd[traj_gait_idx].mean()  # Trajectory Gait

# mask out unused joints in input
joint_weights = np.array(skd.JOINT_WEIGHTS).repeat(3) # Repeat 3 times for X, Y and Z

Xstd[joint_pos] = Xstd[joint_pos].mean() / (joint_weights * 0.1) # Joint Positions
Xstd[joint_vel] = Xstd[joint_vel].mean() / (joint_weights * 0.1) # Joint Velocities
Xstd[joint_forward] = Xstd[joint_forward].mean() / (joint_weights * 0.1) # Joint Forward Translation
Xstd[joint_up] = Xstd[joint_up].mean() / (joint_weights * 0.1) # Joint Upward Translation

In [10]:
# Parser of Labels
labels_path = 'C:/Users/Ana/Desktop/dev/pfnn-dev/Export/OutputLabels_Idle.txt'

ROOT_STR = 'Root'
TRANSLATION_STR = 'Translation'

with open(labels_path, 'r') as file:
    lines = file.readlines()

past_indices = [str(i) for i in range(0,6)]

# All the index arrays to populate for calculating means/std

# Trajectory
traj_pos_idx = []
traj_dir_idx = []
traj_vel_idx = []

# Joints
joint_pos = []
joint_forward = []
joint_up = []
joint_vel = []

# Root
root_translation = []

for line in lines:
    # Split the line at the first space character to separate index and name
    index, name = line.split(maxsplit=1)
    index = int(index.strip('[]'))  # Remove the square brackets and convert to int
    name = name.strip()  # Remove any surrounding whitespace/newline characters
    
    # Trajectory parsing logic
    if TRAJ_STR in name:
        if POS_STR in name:
            traj_pos_idx.append(index)
        if DIR_STR in name:
            traj_dir_idx.append(index)
        if VEL_STR in name:
            traj_vel_idx.append(index)
    # Joint parsing logic
    if JOINT_STR in name:
        if POS_STR in name:
            joint_pos.append(index)
        if FORWARD_STR in name:
            joint_forward.append(index)
        if UP_STR in name:
            joint_up.append(index)
        if VEL_STR in name:
            joint_vel.append(index)
    # Root parsin logic
    if ROOT_STR in name:
        if TRANSLATION_STR in name:
            root_translation.append(index)

# Debugging Purposes
print(root_translation)

[816, 818]


In [11]:
# Preprocess Output Data

# Trajectory
Ystd[traj_pos_idx] = Ystd[traj_pos_idx].mean()
Ystd[traj_dir_idx] = Ystd[traj_dir_idx].mean()
Ystd[traj_vel_idx] = Ystd[traj_vel_idx].mean()

# Joints
Ystd[joint_pos] = Ystd[joint_pos].mean()
Ystd[joint_forward] = Ystd[joint_forward].mean()
Ystd[joint_up] = Ystd[joint_up].mean()
Ystd[joint_vel] = Ystd[joint_vel].mean()

# Root
Ystd[root_translation] = Ystd[root_translation].mean()

# save mean / std / min / max

Xmean.astype(np.float32).tofile('./weights/Xmean.bin')
Ymean.astype(np.float32).tofile('./weights/Ymean.bin')
Xstd.astype(np.float32).tofile('./weights/Xstd.bin')
Ystd.astype(np.float32).tofile('./weights/Ystd.bin')

# normalize data
X = (X - Xmean) / Xstd
Y = (Y - Ymean) / Ystd

In [12]:
# append phase as additional feature
input = torch.tensor(np.concatenate([X, P [..., np.newaxis]], axis=-1))
target = torch.tensor(Y)

print(f"Input shape {input.shape}")
print(f"Target shape {target.shape}")

dataset = TensorDataset(input, target)

BATCH_SIZE = 32
train_dataloader = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True)

Input shape torch.Size([1276, 889])
Target shape torch.Size([1276, 819])


In [13]:
# ensure GPU is available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# print(f"Using device: {device}")
device = "cuda"

# custom loss function
def loss_func(output, target, model):
    loss = torch.mean((output - target)**2) + model.cost()
    return loss

In [14]:
model = PhaseFunctionedNetwork(input_shape=input.shape[1], output_shape=target.shape[1])
model.to(device)

optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001)

epochs=1

for epoch in range(epochs):
    model.train()
    loss_list = []
    for i, batch in enumerate(tqdm(train_dataloader)):
        input, target = batch
        input, target = input.to(device), target.to(device)

        # forward pass
        output = model(input)
        loss = loss_func(output, target, model)
        loss_list.append(loss.item())

        # backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    print(f'Epoch [{epoch+1}/{epochs}], Loss: {np.average(loss_list)}')


100%|██████████| 40/40 [00:03<00:00, 10.90it/s]

Epoch [1/1], Loss: 0.26744564927094894





In [17]:
# save weights
model.precompute_and_save_weights()