In [1]:
import os
import time
import socket
import threading
import torch
import numpy as np
from datetime import datetime
from argparse import ArgumentParser
os.environ['PYGAME_HIDE_SUPPORT_PROMPT'] = "hide"
from pygame.time import Clock
import pickle

from articulate.math import *
from mobileposer.models import *
from mobileposer.utils.model_utils import *
from mobileposer.config import *

import coremltools as ct
import pytorch_lightning as pl

In [2]:
from mobileposer.models.rnn import RNN
class JointsBase(nn.Module):
    """
    Inputs: N IMUs.
    Outputs: 24 Joint positions. 
    """

    def __init__(self, n_imu, seq_length):
        super().__init__()
        # self.joints = net.joints.joints
        self.joints = RNN(n_imu, 24 * 3, 256, seq_length)

    def forward(self, batch, input_lengths: Tensor):
        # forward joint model
        joints, _, _ = self.joints(batch)
        return joints

class PoserBase(nn.Module):
    """
    Inputs: N IMUs.
    Outputs: SMPL Pose Parameters (as 6D Rotations).
    """
    def __init__(self, n_output_joints, n_imu, n_reduced, seq_length):
        super().__init__()
        # self.pose = net.pose.pose
        self.pose = RNN(n_output_joints*3 + n_imu, n_reduced*6, 256, seq_length)

    def forward(self, batch, input_lengths: Tensor):
        # forward the pose prediction model
        pred_pose, _, _ = self.pose(batch)
        return pred_pose

class VelocityBase(nn.Module):
    """
    Inputs: N IMUs.
    Outputs: Per-Frame Root Velocity. 
    """

    def __init__(self, n_output_joints, n_imu, seq_length):
        super().__init__()

        # model definitions
        # self.vel = net.velocity.vel
        self.vel = RNN(n_output_joints * 3 + n_imu, 24 * 3, 256, bidirectional=False, seq_length=seq_length)

    def forward(self, batch, h, c, input_lengths:Tensor):
        # forward velocity model
        vel, _, state = self.vel(batch, (h,c))
        h_out, c_out = state[0].detach(), state[1].detach()
        return vel, h_out, c_out
    
class FootContactBase(nn.Module):
    """
    Inputs: N IMUs.
    Outputs: Foot Contact Probability ([s_lfoot, s_rfoot]).
    """

    def __init__(self, n_output_joints, n_imu, seq_length):
        super().__init__()
        # self.footcontact = net.foot_contact.footcontact
        self.footcontact = RNN(n_output_joints * 3 + n_imu, 2, 64, seq_length=seq_length)

    def forward(self, batch, input_lengths: Tensor):
        # forward foot contact model
        foot_contact, _, _ = self.footcontact(batch)
        return foot_contact

In [3]:
class MobilePoserBase(nn.Module):
    # def __init__(self, net, body_model, joints_model, pose_model, contact_model, velocity_model, n_reduced, ignored):
    #     super().__init__()

    #     #constants
    #     self.n_reduced = n_reduced
    #     self.ignored = ignored

    #     #core model layers
    #     self.bodymodel = body_model
    #     self.joints = joints_model
    #     self.pose = pose_model
    #     self.foot_contact = contact_model
    #     self.velocity = velocity_model
    def __init__(self, n_reduced, ignored, n_imu, n_output_joints, seq_length):
        super().__init__()

        #constants
        self.n_reduced = n_reduced
        self.ignored = ignored

        #core model layers
        self.joints = JointsBase(n_imu=n_imu, seq_length=seq_length)
        self.pose = PoserBase(n_imu=n_imu, n_output_joints=n_output_joints, n_reduced=n_reduced, seq_length=seq_length)
        self.foot_contact = FootContactBase(n_output_joints=n_output_joints, n_imu=n_imu, seq_length=seq_length)
        self.velocity = VelocityBase(n_imu=n_imu, n_output_joints=n_output_joints, seq_length=seq_length)
    
    def global_to_local_pose(self, pose: torch.Tensor) -> torch.Tensor:
        # this runs in Python only
        return self.bodymodel.inverse_kinematics_R(pose)

    def forward(self, batch, h, c, input_lengths):
        # forward the joint prediction model
        pred_joints = self.joints(batch, input_lengths)

        # forward the pose prediction model
        pose_input = torch.cat((pred_joints, batch), dim=-1)
        pred_pose = self.pose(pose_input, input_lengths)

        # forward the foot-ground contact probability model
        tran_input = torch.cat((pred_joints, batch), dim=-1)
        foot_contact = self.foot_contact(tran_input, input_lengths)

        # foward the foot-joint velocity model
        pred_vel, velocity_h, velocity_c = self.velocity(tran_input, h, c, input_lengths)
        pred_vel = pred_vel.squeeze(0)

        pred_pose, pred_joints, pred_vel, foot_contact = self.process_base_outputs(pred_pose, pred_joints, pred_vel, foot_contact)

        return pred_pose, pred_joints, pred_vel, foot_contact, velocity_h, velocity_c
    
    def process_base_outputs(self, pose, pred_joints, vel, contact):
        
        pose = art.math.r6d_to_rotation_matrix(pose).view(-1, 24, 3, 3)

        # get pose
        pose = pose[40].view(-1, 9) #hardcoded num_past_frames = 

        # compute the joint positions from predicted pose
        joints = pred_joints.squeeze(0)[40].view(24, 3)

        # compute translation from foot-contact probability
        contact = contact[0][40]

        # velocity from network-based estimation
        root_vel = vel.view(-1, 24, 3)[:, 0]
        pred_vel = root_vel[40] / (30/2) #hardcoded fps = 30, vel_scale = 2

        # Need to implement in Swift

        # lfoot_pos, rfoot_pos = joints[10], joints[11]
        # if contact[0] > contact[1]:
        #     contact_vel = self.last_lfoot_pos - lfoot_pos + self.gravity_velocity
        # else:
        #     contact_vel = self.last_rfoot_pos - rfoot_pos + self.gravity_velocity
        # weight = self._prob_to_weight(contact.max())
        # velocity = art.math.lerp(pred_vel, contact_vel, weight)
        # current_foot_y = self.current_root_y + min(lfoot_pos[1].item(), rfoot_pos[1].item())
        # if current_foot_y + velocity[1].item() <= self.floor_y:
        #     velocity[1] = self.floor_y - current_foot_y

        # self.current_root_y += velocity[1].item()
        # self.last_lfoot_pos, self.last_rfoot_pos = lfoot_pos, rfoot_pos
        # self.last_root_pos += velocity

        return pose, joints, pred_vel, contact

In [4]:
base = MobilePoserBase(
                    # n_reduced=joint_set.n_full,
                    n_reduced=joint_set.n_reduced,
                    ignored=joint_set.ignored,
                    n_imu=model_config.n_imu,
                    n_output_joints=model_config.n_output_joints,
                    seq_length=torch.tensor([model_config.past_frames+model_config.future_frames]))

In [6]:
# base.load_state_dict(torch.load("/Users/brianchen/Research/MobilePoser/mobileposer/checkpoints/model_finetuned.pth", map_location="cpu"))
base.load_state_dict(torch.load("/Users/brianchen/Research/MobilePoser/mobileposer/checkpoints/weights.pth"))
base.eval()

MobilePoserBase(
  (joints): JointsBase(
    (joints): RNN(
      (rnn): LSTM(256, 256, num_layers=2, batch_first=True, bidirectional=True)
      (linear1): Linear(in_features=60, out_features=256, bias=True)
      (linear2): Linear(in_features=512, out_features=72, bias=True)
      (dropout): Dropout(p=0.4, inplace=False)
    )
  )
  (pose): PoserBase(
    (pose): RNN(
      (rnn): LSTM(256, 256, num_layers=2, batch_first=True, bidirectional=True)
      (linear1): Linear(in_features=132, out_features=256, bias=True)
      (linear2): Linear(in_features=512, out_features=96, bias=True)
      (dropout): Dropout(p=0.4, inplace=False)
    )
  )
  (foot_contact): FootContactBase(
    (footcontact): RNN(
      (rnn): LSTM(64, 64, num_layers=2, batch_first=True, bidirectional=True)
      (linear1): Linear(in_features=132, out_features=64, bias=True)
      (linear2): Linear(in_features=128, out_features=2, bias=True)
      (dropout): Dropout(p=0.4, inplace=False)
    )
  )
  (velocity): Veloci

In [7]:
scripted_core = torch.jit.script(base)
scripted_core.eval()

RecursiveScriptModule(
  original_name=MobilePoserBase
  (joints): RecursiveScriptModule(
    original_name=JointsBase
    (joints): RecursiveScriptModule(
      original_name=RNN
      (rnn): RecursiveScriptModule(original_name=LSTM)
      (linear1): RecursiveScriptModule(original_name=Linear)
      (linear2): RecursiveScriptModule(original_name=Linear)
      (dropout): RecursiveScriptModule(original_name=Dropout)
    )
  )
  (pose): RecursiveScriptModule(
    original_name=PoserBase
    (pose): RecursiveScriptModule(
      original_name=RNN
      (rnn): RecursiveScriptModule(original_name=LSTM)
      (linear1): RecursiveScriptModule(original_name=Linear)
      (linear2): RecursiveScriptModule(original_name=Linear)
      (dropout): RecursiveScriptModule(original_name=Dropout)
    )
  )
  (foot_contact): RecursiveScriptModule(
    original_name=FootContactBase
    (footcontact): RecursiveScriptModule(
      original_name=RNN
      (rnn): RecursiveScriptModule(original_name=LSTM)
      

In [27]:
bodymodel = art.model.ParametricModel(paths.smpl_file)
bodymodel.get_zero_pose_joint_and_vertex()
j, _ = bodymodel.get_zero_pose_joint_and_vertex()
feet_pos = j[10:12].clone()
floor_y = j[10:12, 1].min().item()

In [29]:
feet_pos

tensor([[ 0.1283, -0.9559,  0.0750],
        [-0.1194, -0.9564,  0.0774]])

In [31]:
last_lfoot_pos, last_rfoot_pos = (pos for pos in feet_pos)

In [34]:
floor_y

-0.9563523530960083

In [8]:
num_past_frames = model_config.past_frames
num_future_frames = model_config.future_frames
num_total_frames = num_past_frames + num_future_frames
data = torch.zeros((60))
imu = data.repeat(num_total_frames, 1)

input_length = torch.tensor([num_total_frames])
imu_input = imu.unsqueeze(0)
h, c = (torch.zeros((2, 1, 256)), torch.zeros((2, 1, 256)))

In [9]:
base.eval()
with torch.no_grad():
    out = base(imu_input, h, c, input_length)

IndexError: index 40 is out of bounds for dimension 0 with size 30

In [None]:
scripted_core = scripted_core.eval()
traced_core = torch.jit.trace(base, (imu_input, h, c, input_length))
model_from_trace = ct.convert(
    traced_core,
    inputs=[ct.TensorType(shape=imu_input.shape), ct.TensorType(shape=h.shape), ct.TensorType(shape=c.shape), ct.TensorType(shape=input_length.shape)],
)

When both 'convert_to' and 'minimum_deployment_target' not specified, 'convert_to' is set to "mlprogram" and 'minimum_deployment_target' is set to ct.target.iOS15 (which is same as ct.target.macOS12). Note: the model will not run on systems older than iOS15/macOS12/watchOS8/tvOS15. In order to make your model run on older system, please set the 'minimum_deployment_target' to iOS14/iOS13. Details please see the link: https://apple.github.io/coremltools/docs-guides/source/target-conversion-formats.html
Tuple detected at graph output. This will be flattened in the converted model.
Converting PyTorch Frontend ==> MIL Ops:  99%|█████████▉| 212/214 [00:00<00:00, 1620.72 ops/s]
Running MIL frontend_pytorch pipeline: 100%|██████████| 5/5 [00:00<00:00, 106.53 passes/s]
Running MIL default pipeline: 100%|██████████| 89/89 [00:01<00:00, 80.40 passes/s]
Running MIL backend_mlprogram pipeline: 100%|██████████| 12/12 [00:00<00:00, 268.82 passes/s]


In [10]:
# Save model as a Core ML model package
output_descriptions = [("pred_pose", "global pose output, need to convert global reduced to local full pose"),
                                        ("pred_joints", "joint predictions"),
                                         ("pred_vel", "velocity predictions"),
                                          ("foot_contact", "foot contact prediction"),
                                           ("velocity_h", "hidden states for velocity lstm"),
                                           ("velocity_c", "initial cell states for velocity lstm")]
model_from_trace.save("MobilePoser.mlpackage")
# Load the saved model
loaded_model = ct.models.MLModel("MobilePoser.mlpackage")

In [11]:
spec = model_from_trace.get_spec()

In [12]:
for out, (name, desc) in zip(model_from_trace._spec.description.output, output_descriptions):
    out.shortDescription = desc

In [14]:
from pytorch3d.transforms import quaternion_to_matrix, matrix_to_quaternion

class Quaternion2Matrix(torch.nn.Module):
    def __init__(self):
        super().__init__()
    
    def quaternion_to_matrix(self, quaternions: torch.Tensor) -> torch.Tensor:

        r, i, j, k = torch.unbind(quaternions, -1)
        # pyre-fixme[58]: `/` is not supported for operand types `float` and `Tensor`.
        two_s = 2.0 / (quaternions * quaternions).sum(-1)

        o = torch.stack(
            (
                1 - two_s * (j * j + k * k),
                two_s * (i * j - k * r),
                two_s * (i * k + j * r),
                two_s * (i * j + k * r),
                1 - two_s * (i * i + k * k),
                two_s * (j * k - i * r),
                two_s * (i * k - j * r),
                two_s * (j * k + i * r),
                1 - two_s * (i * i + j * j),
            ),
            -1,
        )
        return o.reshape(quaternions.shape[:-1] + (3, 3))

    def forward(self, quat):
        return self.quaternion_to_matrix(quat)
        # # ori, calibration_quats: (..., 4)  quats in (x, y, z, w) order
        # # acc:                   (..., 3)
        # # device_id:             (1,)  – choose which calibration quat to use

        # device_mean_quat = calibration_quats     # stays a tensor

        # og_mat   = self.quaternion_to_matrix(ori)                   # (..., 3, 3)
        # global_if = self.quaternion_to_matrix(device_mean_quat)     # (3, 3)
        # return og_mat, global_if

        # global_mat = torch.matmul(global_if.T, og_mat)         # R_g←s
        # global_ori = matrix_to_quaternion(global_mat)          # (..., 4)

        # acc_ref   = torch.matmul(og_mat, acc.unsqueeze(-1)).squeeze(-1)
        # global_acc = torch.matmul(global_if.T, acc_ref.unsqueeze(-1)).squeeze(-1)

        # return global_ori, global_acc

In [15]:
quat2matrix_func = Quaternion2Matrix()

In [16]:
ori_input = torch.ones((4))
acc_input = torch.ones((3))
calibration_quat = torch.ones((4))
traced_func = torch.jit.trace(quat2matrix_func, example_inputs=(ori_input))

In [17]:
quat2matrix_func_from_trace = ct.convert(
    traced_func,
    inputs=[ct.TensorType(shape=ori_input.shape)],
    convert_to="mlprogram")
quat2matrix_func_from_trace.save("Quat2Matrix.mlpackage")

Model is not in eval mode. Consider calling '.eval()' on your model prior to conversion
Converting PyTorch Frontend ==> MIL Ops:  99%|█████████▊| 72/73 [00:00<00:00, 3658.53 ops/s]
Running MIL frontend_pytorch pipeline: 100%|██████████| 5/5 [00:00<00:00, 1108.67 passes/s]
Running MIL default pipeline: 100%|██████████| 89/89 [00:00<00:00, 665.85 passes/s]
Running MIL backend_mlprogram pipeline: 100%|██████████| 12/12 [00:00<00:00, 1553.40 passes/s]


In [22]:
from pytorch3d.transforms import quaternion_to_matrix, matrix_to_quaternion, standardize_quaternion

class Sensor2Global(torch.nn.Module):
    def __init__(self):
        super().__init__()
    
    def quaternion_to_matrix(self, quaternions: torch.Tensor) -> torch.Tensor:

        r, i, j, k = torch.unbind(quaternions, -1)
        # pyre-fixme[58]: `/` is not supported for operand types `float` and `Tensor`.
        two_s = 2.0 / (quaternions * quaternions).sum(-1)

        o = torch.stack(
            (
                1 - two_s * (j * j + k * k),
                two_s * (i * j - k * r),
                two_s * (i * k + j * r),
                two_s * (i * j + k * r),
                1 - two_s * (i * i + k * k),
                two_s * (j * k - i * r),
                two_s * (i * k - j * r),
                two_s * (j * k + i * r),
                1 - two_s * (i * i + j * j),
            ),
            -1,
        )
        return o.reshape(quaternions.shape[:-1] + (3, 3))
    

    def forward(self, ori, acc, calibration_quats):
        # # ori, calibration_quats: (..., 4)  quats in (x, y, z, w) order
        # # acc:                   (..., 3)
        # # device_id:             (1,)  – choose which calibration quat to use

        device_mean_quat = calibration_quats     # stays a tensor

        og_mat   = self.quaternion_to_matrix(ori)                   # (..., 3, 3)
        global_if = self.quaternion_to_matrix(device_mean_quat)     # (3, 3)

        global_mat = torch.matmul(global_if.T, og_mat)         # R_g←s
        global_ori = matrix_to_quaternion_single(global_mat)          # (..., 4)
        acc_ref   = torch.matmul(og_mat, acc.unsqueeze(-1)).squeeze(-1)
        global_acc = torch.matmul(global_if.T, acc_ref.unsqueeze(-1)).squeeze(-1)

        return global_ori, global_acc

In [23]:
sensor2global_func = Sensor2Global()
sensor2global_func.eval()

Sensor2Global()

In [25]:
traced_func = torch.jit.trace(sensor2global_func, example_inputs=(ori_input, acc_input, calibration_quat))
sensor2global_func_from_trace = ct.convert(
    traced_func,
    inputs=[ct.TensorType(shape=ori_input.shape), ct.TensorType(shape=acc_input.shape), ct.TensorType(shape=calibration_quat.shape)],
    convert_to="mlprogram")
quat2matrix_func_from_trace.save("Sensor2Global.mlpackage")

Tuple detected at graph output. This will be flattened in the converted model.
Converting PyTorch Frontend ==> MIL Ops:  99%|█████████▉| 336/338 [00:00<00:00, 5293.61 ops/s]
Running MIL frontend_pytorch pipeline: 100%|██████████| 5/5 [00:00<00:00, 277.02 passes/s]
Running MIL default pipeline: 100%|██████████| 89/89 [00:00<00:00, 152.30 passes/s]
Running MIL backend_mlprogram pipeline: 100%|██████████| 12/12 [00:00<00:00, 284.24 passes/s]


In [21]:
import torch

def matrix_to_quaternion_single(matrix: torch.Tensor) -> torch.Tensor:
    """
    Args
    ----
    matrix : torch.Tensor, shape (3, 3)
             Rotation matrix  (rows are destination axes).

    Returns
    -------
    quat   : torch.Tensor, shape (4,)  --  (w, x, y, z) with ||quat|| == 1
    """

    # ------------------------------
    # 1.  Pull out the nine elements
    # ------------------------------
    m00, m01, m02 = matrix[0, 0], matrix[0, 1], matrix[0, 2]
    m10, m11, m12 = matrix[1, 0], matrix[1, 1], matrix[1, 2]
    m20, m21, m22 = matrix[2, 0], matrix[2, 1], matrix[2, 2]

    one = matrix.new_tensor(1.0)

    # -------------------------------------------------------
    # 2.  Same four “absolute” terms as the original function
    # -------------------------------------------------------
    q_abs = torch.sqrt(
        torch.clamp(
            torch.stack(
                [
                    one + m00 + m11 + m22,        # w‑candidate
                    one + m00 - m11 - m22,        # x‑candidate
                    one - m00 + m11 - m22,        # y‑candidate
                    one - m00 - m11 + m22,        # z‑candidate
                ]
            ),
            min=0.0,
        )
    )                                           # shape (4,)

    quat_by_rijk = torch.stack(
        [
            torch.stack([q_abs[0] ** 2,
                         m21 - m12,
                         m02 - m20,
                         m10 - m01]),
            torch.stack([m21 - m12,
                         q_abs[1] ** 2,
                         m10 + m01,
                         m02 + m20]),
            torch.stack([m02 - m20,
                         m10 + m01,
                         q_abs[2] ** 2,
                         m12 + m21]),
            torch.stack([m10 - m01,
                         m20 + m02,
                         m21 + m12,
                         q_abs[3] ** 2]),
        ],
        dim=0,                                   # shape (4, 4)
    )

    # ---------------------------------------------------
    # 4.  Divide each row by the same “safe” denominator
    # ---------------------------------------------------
    floor = matrix.new_tensor(0.1)
    denom = 2.0 * torch.max(q_abs, floor)        # shape (4,)
    quat_candidates = quat_by_rijk / denom.unsqueeze(-1)

    # ----------------------------------------------------------
    # 5.  Pick the best‑conditioned row (largest q_abs element)
    #     – use tensor‑indexing so the tracer records a gather
    # ----------------------------------------------------------
    best_idx = torch.argmax(q_abs)               # tensor scalar
    quat = quat_candidates[best_idx]             # shape (4,)

    # 6.  Normalize to exactly unit length and return (w, x, y, z)
    return quat / quat.norm(p=2)


In [75]:
import torch
import math
from typing import List
import cv2

import torch
import torch.nn as nn

class RotationMatrixToAxisAngle(nn.Module):
    def forward(self, r: torch.Tensor) -> torch.Tensor:
        """
        :param r: Tensor of shape (..., 3, 3), a batch of rotation matrices
        :return: Tensor of shape (..., 3), the corresponding axis-angle vectors
        """
        # Flatten batch dims
        R = r.view(-1, 3, 3)

        # 1) compute the trace → cos θ
        tr = R[..., 0, 0] + R[..., 1, 1] + R[..., 2, 2]
        cos_theta = (tr - 1.0) * 0.5
        cos_theta = cos_theta.clamp(-1.0, 1.0)

        # 2) recover θ
        theta = torch.acos(cos_theta)

        # 3) compute the "cross-differences" v = [R32-R23, R13-R31, R21-R12]
        rx = R[..., 2, 1] - R[..., 1, 2]
        ry = R[..., 0, 2] - R[..., 2, 0]
        rz = R[..., 1, 0] - R[..., 0, 1]
        v   = torch.stack((rx, ry, rz), dim=-1)

        # 4) normalize to get the rotation axis: axis = v / (2 sin θ)
        sin_theta = torch.sin(theta).clamp(min=1e-6).unsqueeze(-1)
        axis = v / (2.0 * sin_theta)

        # 5) axis-angle vector = axis * θ
        rot_vec = axis * theta.unsqueeze(-1)

        # un-flatten to (..., 3)
        return rot_vec.view(*r.shape[:-2], 3)


class RodriguesFunc(nn.Module):
    def __init__(self):
        super().__init__()

    def forward(self, r: torch.Tensor):
        r"""
        Turn rotation matrices into axis-angles. (torch, batch)

        :param r: Rotation matrix tensor that can reshape to [batch_size, 3, 3].
        :return: Axis-angle tensor of shape [batch_size, 3].
        """
        result = [cv2.Rodrigues(_)[0] for _ in r.clone().detach().cpu().view(-1, 3, 3).numpy()]
        result = torch.from_numpy(np.stack(result)).float().squeeze(-1)
        return result

In [73]:
test_tensor = torch.rand_like(torch.zeros((3,3)))

In [79]:
new = RotationMatrixToAxisAngle()
rotation_func = RodriguesFunc()

print(new(test_tensor))
rotation_func(test_tensor)

tensor([0.3668, 0.4553, 0.4322])


tensor([[0.8145, 0.8596, 0.7993]])

In [None]:

rotation_func.eval()

traced_func = torch.jit.trace(rotation_func, example_inputs=(test_tensor))
rodrigues_func_from_trace = ct.convert(
    traced_func,
    inputs=[ct.TensorType(shape=test_tensor.shape)],
    convert_to="mlprogram")

  result = [cv2.Rodrigues(_)[0] for _ in r.clone().detach().cpu().view(-1, 3, 3).numpy()]


RuntimeError: Tracer cannot infer type of [[0.814464   0.85956454 0.7993293 ]]
:Only tensors and (possibly nested) tuples of tensors, lists, or dictsare supported as inputs or outputs of traced functions, but instead got value of type ndarray.

In [71]:
rodrigues_func_from_trace.predict({"r": test_tensor})

{'const_1': array([[1.7409908, 1.4804977, 1.3079976]], dtype=float32)}