In [None]:
import time 

import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import pinocchio as pin
from scipy.optimize import least_squares
from scipy.special import huber
from projectyl.utils.properties import ELBOW, SHOULDER, WRIST
from projectyl.utils.arm import plot_optimization_curves

from projectyl.dynamics.inverse_kinematics import build_arm_model
from projectyl.utils.arm import interactive_replay_sequence
from projectyl.utils.io import Dump
from interactive_pipe.helper import _private
from typing import List, Tuple, Dict, Any
_private.registered_controls_names = [] # this is for notebooks where you re-execute cells everytime.

%load_ext autoreload
%autoreload 2
global_params = {}
build_arm_model(global_params, headless=False, free_elbow=True)
arm_robot = global_params["arm"]
viz = global_params["viz"]  

In [None]:
arm_robot.model.createData()
viz.display(arm_robot.q0)

TESTS arm moving, random constant acceleration

In [None]:
from projectyl.dynamics.simulation import build_simulation
DT = 1e-2
OS_FACTOR = 100
T = 30
out = build_simulation(arm_robot, T=T*OS_FACTOR, DT=DT/OS_FACTOR)
# rec_ground_truth_q, rec_ground_truth_vq, rec_ground_truth_aq, rec_ground_truth_tauq, rec_ground_truth_shoulder_p, rec_ground_truth_elbow_p, rec_ground_truth_wrist_p = out

In [None]:
def decimate(inp, OS_FACTOR=OS_FACTOR):
    return inp[OS_FACTOR//2::OS_FACTOR]

new_out = list(map(decimate, out))
rec_ground_truth_q, rec_ground_truth_vq, rec_ground_truth_aq, rec_ground_truth_tauq, rec_ground_truth_shoulder_p, rec_ground_truth_elbow_p, rec_ground_truth_wrist_p = new_out
len(rec_ground_truth_q)


In [None]:
# We will feed the optimiser with the following parameters
# ground_truth_p = [p_shoulder_1, p_elbow_1, p_wrist_1, ..., p_shoulder_T, p_elbow_T, p_wrist_T], T and nq
# The optimisation variable is
# [q_1, q_2, ..., q_T, tauq_3, tauq_4, ..., tauq_T-2]

ground_truth_p = np.concatenate([
    np.concatenate([
        rec_ground_truth_shoulder_p[i], rec_ground_truth_elbow_p[i], rec_ground_truth_wrist_p[i]]
    ) for i in range(T)]
)

# Simple case here : we feed the "exact" solution to the optimiser
initial_optim_var = np.concatenate(rec_ground_truth_q + rec_ground_truth_tauq[2:-2])


In [None]:
from projectyl.dynamics.dynamics import process_var
shoulder_frame_id = arm_robot.model.getFrameId(SHOULDER)
elbow_frame_id = arm_robot.model.getFrameId(ELBOW)
wrist_frame_id = arm_robot.model.getFrameId(WRIST)


# Utility functions

In [None]:
def process_var(var, T, nq) -> Tuple[np.ndarray, np.ndarray]:
    # [q_1, q_2, ..., q_T, tauq_3, tauq_4, ..., tauq_T-2]

    # [ q1 , q2    q3  ,                       , qT-1, qT]
    # [ -    - , tauq_3, tauq_4, ..., tauq_T-2 ,  -   -  ]
    tq_unnormalized = var[:T * nq].reshape(T, nq) # tq =[q_1, q_2, ..., q_T]
    ttauq = var[T * nq:] # ttauq = [tauq_3, tauq_4, ..., tauq_T-2]


    # Get tq

    # Normalize shoulder
    shoulder_quaternion_unnormalized = tq_unnormalized[:, :4] # Shouler quaternion 4 premières valeurs de q

    shoulder_quaternion_norm = np.linalg.norm(
        shoulder_quaternion_unnormalized, 
        axis=1,
        keepdims=True
    )
    
    shoulder_quaternion_normalized = shoulder_quaternion_unnormalized / shoulder_quaternion_norm

    # Normalize elbow
    elbow_angle_unnormalized = tq_unnormalized[:, 4:].reshape(T, 2)

    elbow_angle_norm = np.linalg.norm(
        elbow_angle_unnormalized,
        axis=1,
        keepdims=True
    )

    elbow_angle_normalized = elbow_angle_unnormalized / elbow_angle_norm

    tq = np.concatenate(
        (
            shoulder_quaternion_normalized, 
            elbow_angle_normalized
        ), 
        axis=1
    )
    assert tq.shape == tq_unnormalized.shape

    return tq, ttauq

In [None]:
def get_velocity_acceleration(tq, T, nv, DT):

    tvq = np.empty((T - 2, nv))
    # nv=4

    # Pourquoi ces opérations ne sont pas vectorizés dans pinocchio ...
    for i in range(1, T - 1):
        tvq[i - 1] = pin.difference(arm_robot.model, tq[i - 1], tq[i + 1]) / (2 * DT)

    taq = (tvq[2:] - tvq[:-2]) / (2 * DT) 

    return tvq, taq

In [None]:
def get_3D_pose_velocity_acceleration(tq, T, DT):
    # [p_shoulder_1, p_elbow_1, p_wrist_1, ..., p_shoulder_T, p_elbow_T, p_wrist_T]

    tshoulder_p = np.empty((T, 3))
    telbow_p = np.empty((T, 3))
    twrist_p = np.empty((T, 3))

    for i in range(T):
        # Forward kinematics
        pin.framesForwardKinematics(arm_robot.model, arm_robot.data, tq[i])

        # Predicted 3D points
        tshoulder_p[i] = arm_robot.data.oMf[shoulder_frame_id].translation
        telbow_p[i] = arm_robot.data.oMf[elbow_frame_id].translation
        twrist_p[i] = arm_robot.data.oMf[wrist_frame_id].translation

    # Computes speed and acceleration
    tshoulder_v = (tshoulder_p[2:] - tshoulder_p[:-2]) / (2 * DT)
    telbow_v = (telbow_p[2:] - telbow_p[:-2]) / (2 * DT)
    twrist_v = (twrist_p[2:] - twrist_p[:-2]) / (2 * DT)

    tshoulder_a = (tshoulder_v[2:] - tshoulder_v[:-2]) / (2 * DT)
    telbow_a = (telbow_v[2:] - telbow_v[:-2]) / (2 * DT)
    twrist_a = (twrist_v[2:] - twrist_v[:-2]) / (2 * DT)

    data_pos3D = np.concatenate((tshoulder_p, telbow_p, twrist_p), axis=1).flatten()
    velocity = np.concatenate(
        (tshoulder_v, telbow_v, twrist_v), axis=0).flatten()
    acceleration = np.concatenate(
        (tshoulder_a, telbow_a, twrist_a), axis=0).flatten()
    return data_pos3D, velocity, acceleration

# Cost terms definitions

In [None]:
# If the Lagrange dynamics formulation cannote be totally satisfied,
# one can relax by minimizing the difference between:
# - the Lagrange dynamics torques tau_rec predicited from q, vq and aq.
# - the current predicted torques ttauq
# question: why not also going backward in time ? ici middle point

def full_body_dynamics(tq, tvq, taq, T, nv):
    
    ttau = np.empty((T - 4, nv))

    for i in range(2, T - 2):
        ttau[i - 2] = pin.rnea(arm_robot.model, arm_robot.data, tq[i], tvq[i - 1], taq[i - 2])

    return ttau.flatten()

In [None]:
# Build the cost function
def objective(var, observed_p, T, nq, nv ,debug=False) -> np.ndarray:
    tq, ttauq = process_var(var, T, nq)
    tvq, taq = get_velocity_acceleration(tq, T, nv, DT)
    tp, tv, ta = get_3D_pose_velocity_acceleration(tq, T, DT)
    ttau = full_body_dynamics(tq, tvq, taq, T, nv)


    if debug :
        print("Diff between 3D pose :", np.linalg.norm(observed_p - tp))
        print("Smooth velocity :", np.linalg.norm(tv))
        print("Smooth acceleration :", np.linalg.norm(ta))
        print("Smooth torque :", np.linalg.norm(ttauq))
        print("Dynamics :", np.linalg.norm(ttau - ttauq))

    res_p = observed_p - tp
    mask_p = np.abs(res_p) > 1
    res_p[mask_p] = 2 * np.sqrt(np.abs(res_p[mask_p])) - 1

    res = np.concatenate([
        (res_p), 
        0.6 * tv,
        0.6 * ta,
        0.6 * ttauq,
        2 * (ttau - ttauq),
    ])

    return res

In [None]:
tq_gt = np.vstack(rec_ground_truth_q)[2:-2]
tvq_gt = np.vstack(rec_ground_truth_vq)[2:-2]
ttauq_gt = np.vstack(rec_ground_truth_tauq)[2:-2]

In [None]:
gt_tot = np.concatenate((tq_gt, tvq_gt, ttauq_gt), axis=1)

In [None]:
#plot_optimization_curves([(gt_tot, "[gt]", "--"),])

# Plotting
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Plotting each array
for i, tpl in enumerate([(tq_gt, "Q state"), (tvq_gt, "Velocity"), (ttauq_gt, "Torque")]):
    array, title = tpl
    axes[i].plot(array)
    axes[i].set_title('Evolution of ' + title)
    axes[i].set_xlabel('Time')

plt.tight_layout()
plt.show()

# Optimization

In [None]:
initial_var = initial_optim_var.copy()
initial_var[:T * arm_robot.model.nq] += 0.01 * np.random.rand(T * arm_robot.model.nq)
initial_var[T * arm_robot.model.nq:] = 0.
tq_init, _ = process_var(initial_var, T, arm_robot.model.nq)
tvq_init, taq_init = get_velocity_acceleration(tq_init, T, arm_robot.model.nv, DT)
initial_var[T * arm_robot.model.nq:] = full_body_dynamics(tq_init, tvq_init, taq_init, T, arm_robot.model.nv)

In [None]:
sol = least_squares(objective, initial_var, args=(ground_truth_p, T, arm_robot.model.nq, arm_robot.model.nv), method='lm').x

In [None]:
np.linalg.norm(objective(sol, ground_truth_p, T, arm_robot.model.nq, arm_robot.model.nv, True))

In [None]:
np.linalg.norm(objective(initial_optim_var, ground_truth_p, T, arm_robot.model.nq, arm_robot.model.nv, True))

In [None]:
tq, ttauq = process_var(sol, T, arm_robot.model.nq)

In [None]:
vq, va = get_velocity_acceleration(tq, T, arm_robot.model.nv, DT)

In [None]:
estimation = np.concatenate([tq[2:-2], vq[1:-1], ttauq.reshape(T-4, arm_robot.model.nv)], axis=1)

In [None]:
#interactive_replay_sequence(
#    {
#        "estimated": res[:, :arm_robot.model.nq],
#        "groundtruth": gt_sol[:, :arm_robot.model.nq],
#    },
#    viz
#)

In [None]:
#plot_optimization_curves([
#   (gt_tot, "[gt]", "--"),
#   (estimation, "[estimation]", "-"),
#   ]
#)
#plot_optimization_curves([
#   (gt_tot, "[gt]", "--"),
#   ]
#)
#
#plot_optimization_curves([
#   (estimation, "[estimation]", "-"),
#   ]
#)

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Plotting each array
for i, tpl in enumerate([(tq_gt, "Q state"), (tvq_gt, "Velocity"), (ttauq_gt, "Torque")]):
    array, title = tpl
    axes[i].plot(array)
    axes[i].set_title('Evolution of ' + title)
    axes[i].set_xlabel('Time')

plt.tight_layout()
plt.show()

In [None]:
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Plotting each array
for i, tpl in enumerate([(tq[2:-2], "Q state"), (vq[1:-1], "Velocity"), (ttauq.reshape(T-4, arm_robot.model.nv), "Torque")]):
    array, title = tpl
    axes[i].plot(array)
    axes[i].set_title('Evolution of ' + title)
    axes[i].set_xlabel('Time')

plt.tight_layout()
plt.show()

In [None]:
interactive_replay_sequence(
    {
        "estimated": estimation[:, :arm_robot.model.nq],
        "groundtruth": gt_tot[:, :arm_robot.model.nq],
    },
    viz
)