In [None]:
import sympy as sp, numpy as np
from sympy import Matrix as Mat

# sp.init_printing(use_latex=False, use_unicode=False)

%load_ext autoreload
%autoreload 2
import ckc_lib
import physical_education as pe

In [None]:
constants = mb, mu, ml, rb, ru, rl, g = sp.symbols('m_b m_u m_l r_b r_u r_l g', real=True)
constants_mapping = dict(zip(constants, (4.1, 0.090, 0.090, 0.15, 0.1375, 0.250, 9.81)))

Ib, Iu, Il = mb*(rb**2 + rb**2)/12, (mu*ru**2)/12, (ml*rl**2)/12
Ib = 0

# two input torques
τ = τl, τr = Mat(sp.symbols('\\tau_:2', real=True))

# two constraint forces
Fr = Mat([*sp.symbols('N_x N_y', real=True)])

# two ground reaction forces
Lx = Mat(sp.symbols('L_x:2', real=True))
Ly = sp.symbols('L_y', real=True)

Mat([*constants, *τ, *Fr, *Lx, Ly]).T

In [None]:
# reduntant_foot_pos = True

# the states of the leg
def sym_and_derivs(symbol: str):
    return sp.symbols(r'%s \dot{%s} \ddot{%s}' % (symbol, symbol, symbol))

xb, dxb, ddxb = sym_and_derivs('x')
yb, dyb, ddyb = sym_and_derivs('y')

thb, dthb, ddthb = sym_and_derivs('\\theta_b')
thul, dthul, ddthul = sym_and_derivs('\\theta_{ul}')
thur, dthur, ddthur = sym_and_derivs('\\theta_{ur}')
thll, dthll, ddthll = sym_and_derivs('\\theta_{ll}')
thlr, dthlr, ddthlr = sym_and_derivs('\\theta_{lr}')

# if reduntant_foot_pos:
#     fx, dfx, ddfx = sym_and_derivs('f_x')
#     fy, dfy, ddfy = sym_and_derivs('f_y')
#     _qf = [fx, fy]; _dqf = [dfx, dfy]; _ddqf = [ddfx, ddfy]; _B = [0, 0]
# else:
#     fx = dfx = ddfx = fy = dfy = ddfy = _qf = _dqf = _ddqf = _B = ()

q   = Mat([  xb,   yb,   thb,   thul,   thur,   thll,   thlr])#,   *_qf])
dq  = Mat([ dxb,  dyb,  dthb,  dthul,  dthur,  dthll,  dthlr])#,  *_dqf])
ddq = Mat([ddxb, ddyb, ddthb, ddthul, ddthur, ddthll, ddthlr])#, *_ddqf])
B   = Mat([   0,    0, τl+τr,    -τl,    -τr,      0,      0])#,    *_B])

# q, dq, ddq, B

In [None]:
# the base
Pb_I = Mat([xb, yb])
Rb_I = pe.utils.rot(thb).T

# connection between base and top links
conn_b = Pb_I

conn_ls = conn_b #+ Rb_I @ Mat([-rb/2, 0])
conn_rs = conn_b #+ Rb_I @ Mat([ rb/2, 0])

# the top links (shoulder to knee)
Pul_ul = Mat([0, -ru/2])
Rul_I = pe.utils.rot(thul).T
Pul_I = sp.trigsimp(conn_ls + Rul_I @ Pul_ul)

conn_lk = conn_ls + Rul_I @ Mat([0, -ru])

Pur_ur = Mat([0, -ru/2])
Rur_I = pe.utils.rot(thur).T
Pur_I = sp.trigsimp(conn_rs + Rur_I @ Pur_ur)

conn_rk = conn_rs + Rur_I @ Mat([0, -ru])

# the bottom links (knee to foot)
Pll_ll = Mat([0, -rl/2])
Rll_I = pe.utils.rot(thll).T
Pll_I = sp.trigsimp(conn_lk + Rll_I @ Pll_ll)

conn_lf = conn_lk + Rll_I @ Mat([0, -rl])

Plr_lr = Mat([0, -rl/2])
Rlr_I = pe.utils.rot(thlr).T
Plr_I = sp.trigsimp(conn_rk + Rlr_I @ Plr_lr)

conn_rf = conn_rk + Rlr_I @ Mat([0, -rl])

# Geometry constraints

In [None]:
geom_constraints = Mat(conn_rf - conn_lf)
J_c = sp.trigsimp(geom_constraints.jacobian(q))

# Contact constraints
Using the left foot

In [None]:
J_L = conn_lf.jacobian(q)
foot_dx = Mat([conn_lf[0]]).jacobian(q) @ dq

D = np.array([[ 1, 0],
              [-1, 0]])

L = Mat(D).T @ Lx + Mat([0, Ly])

# Manipulator equations

In [None]:
# calculate the system's kinetic and potential energy
Ps = [Pb_I, Pul_I, Pur_I, Pll_I, Plr_I]
Rs = [Rb_I, Rul_I, Rur_I, Rll_I, Rlr_I]
dths = dq[2:]
Is = [  Ib,    Iu,    Iu,    Il,    Il]
ms = [  mb,    mu,    mu,    ml,    ml]

dxy = [P.jacobian(q) @ dq for P in Ps]

Ek = sp.trigsimp(sum(
    0.5*m*(dx**2 + dy**2) + 0.5*I*dth**2
        for m, I, (dx, dy), dth in zip(ms, Is, dxy, dths)
))

Ep = sp.trigsimp(sum(
    m*g*y for m, (_, y) in zip(ms, Ps)
))

In [None]:
# manipulator equation parts: mass matrix, coriolis, gravity/NC force
M, C, G = pe.utils.manipulator_equation(Mat([Ek]), Mat([Ep]), q, dq)

M = sp.simplify(M); G = sp.simplify(G)

In [None]:
EOM = sp.trigsimp((
    M @ ddq + C + G - B - J_L.T @ L - J_c.T @ Fr
).subs(constants_mapping))

In [None]:
BW = float(((mb + 2*mu + 2*ml)*g).subs(constants_mapping))

force_scalings = {}
for f in [*τ, *Fr, *Lx, Ly]:
    force_scalings[f] = f * BW

In [None]:
vars_in_EOM = [*q, *dq, *ddq, *τ, *Fr, *Lx, Ly]

EOM_func = pe.utils.lambdify_EOM(
    Mat([*EOM, *geom_constraints]).xreplace(force_scalings).xreplace(constants_mapping),
    vars_in_EOM
)

def lambdify(expr):
    return pe.utils.lambdify_EOM(expr.subs(constants_mapping), vars_in_EOM)

foot_height_func = lambdify(conn_lf[1])[0]
foot_dx_func = lambdify(foot_dx[0])[0]

# for plotting
joint_position_funcs = {
      'body': lambdify(   Pb_I),
    'l-shou': lambdify(conn_ls),
    'l-knee': lambdify(conn_lk),
    'l-foot': lambdify(conn_lf),

    'r-shou': lambdify(conn_rs),
    'r-knee': lambdify(conn_rk),
    'r-foot': lambdify(conn_rf),
}

# Pyomo time

In [None]:
def penalty_cost(m, contact_scale: float = 10.):
    return (sum(m.contact_penalty[:]) * contact_scale
            + sum(m.friction_penalty[:])
            + sum(m.slip_penalty[:, :]))

def torque_cost(m, BW: float):
    return BW**2 * sum(m.hm[fe]*(m.Tc[fe,T])**2 for fe in m.fe for T in m.Tc_set)

def transport_cost(m, var: str = 'x'):
    nfe = m.fe[-1]
    ncp = m.cp[-1]
    global BW
    return torque_cost(m, BW) / (BW * m.q[nfe,ncp,var])

In [None]:
def get_total_time(m):
    # pre-optimization
    if m.hm[1].value is None:
        nfe = m.fe[-1]
        return m.hm0.value * nfe
    
    # optimization has run
    else:
        return m.hm0.value * sum(m.hm[fe].value for fe in m.fe)

In [None]:
from typing import Optional
from pyomo.environ import Objective
def random_init(m, mu: float = 0, sigma: float = 0.1, seed: Optional[int] = None):
    """
        Randomly init the `q` and `foot_height` variables in `m`
    """
    import random
    if seed is not None:
        random.seed(seed)

    rand = lambda: random.gauss(mu=mu, sigma=sigma)
    for fe, cp in pe.utils.get_indexes(len(m.fe), len(m.cp), one_based=True, skipfirst=False):
        # random height between 0 and 0.5
        m.foot_height[fe, cp].value = min(max(rand(), 0), 0.5)
        
        for var in m.vars:
            m.q[fe,cp,var].value = rand()

def set_all_initial_dq_to_0(m):
    ncp = m.cp[-1]
    for var in m.vars:
        m.dq[1, ncp, var].fix(0)

def constrain_y_above_ground(m):
    for fe, cp in pe.utils.get_indexes(len(m.fe), len(m.cp), one_based=True, skipfirst=False):
        m.q[fe, cp, 'yb'].setlb(0)


def drop_test(m, from_height: float = 5, penalty_scale: float = 1000):
    ncp = m.cp[-1]
    
    random_init(m, mu=0, sigma=0.1)
    set_all_initial_dq_to_0(m)
    constrain_y_above_ground(m)
    
    m.q[1,ncp,'xb'].fix(0)
    m.q[1,ncp,'yb'].fix(from_height)
    m.foot_height[1,ncp].setub(from_height)
    
    pen_cost = penalty_cost(m, contact_scale=10)
    m.cost = Objective(expr = penalty_scale*pen_cost)
    
    return pen_cost



def stop_test(m, from_height: float = 2.5, penalty_scale: float = 1000, initial_vel: float = 1.0):
    nfe, ncp = m.fe[-1], m.cp[-1]
    random_init(m, mu=0, sigma=0.1)
    constrain_y_above_ground(m)
    
    m.q[1,ncp,'xb'].fix(0)
    m.q[1,ncp,'yb'].fix(from_height)
    m.dq[1,ncp,'xb'].fix(initial_vel)
    m.dq[1,ncp,'yb'].fix(0)
    
    m.dq[nfe,ncp,'xb'].fix(0)
    
    pen_cost = penalty_cost(m, contact_scale=10)
    m.cost = Objective(expr = penalty_scale*pen_cost)
    
    return pen_cost



def move_forward_test(m, penalty_scale: float = 1000):
    nfe, ncp = len(m.fe), len(m.cp)
        
    random_init(m, mu=0, sigma=0.1)
    constrain_y_above_ground(m)
    set_all_initial_dq_to_0(m)
    
    # start stationary
    m.q[1, ncp, 'xb'].fix(0)
    m.foot_height[1, ncp].fix(0)
    
    # straight angles
    m.q[1, ncp, 'thb'].fix(0)
    m.q[1, ncp, 'thul'].fix(-0.05)
    m.q[1, ncp, 'thur'].fix( 0.05)
    
    # move forward and up
    m.q[nfe, ncp, 'xb'].setlb(0.7)
    m.q[nfe, ncp, 'yb'].setlb(1.0)
    
    pen_cost = penalty_cost(m, contact_scale=10)
    CoT_cost = transport_cost(m, var='xb')
    m.cost = Objective(expr = penalty_scale*pen_cost + CoT_cost)
    
    return pen_cost



def linear_interp_guess(m, var: str, dist: float, avg_vel: float):
    nfe = m.fe[-1]
    for fe, cp in pe.utils.get_indexes(len(m.fe), len(m.cp), one_based=True, skipfirst=False):
        m.q[fe,cp,var].value = dist * (fe-1)/(nfe-1)
        m.dq[fe,cp,var].value = avg_vel

def periodic_gait_test(m, avg_vel: float, penalty_scale: float = 1000):
    nfe, ncp = len(m.fe), len(m.cp)
    total_time = get_total_time(m)
    
    random_init(m, mu=0, sigma=0.1)
    constrain_y_above_ground(m)
    linear_interp_guess(m, var='xb', dist=avg_vel*total_time, avg_vel=avg_vel)
    
    # fix start point
    m.q[1,ncp,'xb'].fix(0)
    
    @m.Constraint(m.vars)
    def periodic_vel_constr(m, var):
        return m.dq[1,ncp,var] == m.dq[nfe,ncp,var]
    
    @m.Constraint([v for v in m.vars if v != 'xb'])
    def periodic_constr(m, var):
        return m.q[1,ncp,var] == m.q[nfe,ncp,var]
    
    @m.Constraint()  # x_f = Δt * v_avg
    def avg_vel_constraint(m):
        return m.q[nfe,ncp,'xb'] == (sum(m.hm[:]) * m.hm0) * avg_vel #m/s
    
    pen_cost = penalty_cost(m, contact_scale=10)
    CoT_cost = transport_cost(m, var='xb')
    m.cost = Objective(expr = penalty_scale*pen_cost + CoT_cost)
    
    return pen_cost

In [None]:
# class LivePrinter():
#     def __init__(self, filename: str, figsize: tuple = (10, 6)):
#         self.filename = filename
#         self.figsize = figsize
    
#     def __enter__(self):
#         import live_printer, multiprocessing
        
#         self.should_stop = multiprocessing.Event()
#         proc = multiprocessing.Process(
#             target=live_printer.live_plotter,
#             args=(
#                 self.filename,
#                 self.figsize,
#                 lambda: self.should_stop.is_set(),
#             )
#         )
#         proc.start()
        
#     def __exit__(self, *args):
#         print('Shutting down')
#         self.should_stop.set()

In [None]:
# class LivePlotter():
#     def __init__(self, filename: str, figsize: tuple = (10, 6)):
#         self.filename = filename
#         self.figsize = figsize
    
#     def __enter__(self):
#         import live_printer, multiprocessing
        
#         self.should_stop = multiprocessing.Event()
#         proc = multiprocessing.Process(
#             target=live_printer.live_plotter,
#             args=(
#                 self.filename,
#                 self.figsize,
#                 lambda: self.should_stop.is_set(),
#             )
#         )
#         proc.start()
        
#     def __exit__(self, *args):
#         print('Shutting down')
#         self.should_stop.set()

In [None]:
from pyomo.environ import value as pyovalue

m = ckc_lib.pyomo_model(
    EOM_func, foot_height_func, foot_dx_func, nfe=50, total_time=0.4, collocation='radau'
)

# pen_cost = drop_test(m, from_height=1.)
# pen_cost = stop_test(m, from_height=1., initial_vel= 1.)
# pen_cost = move_forward_test(m)
pen_cost = periodic_gait_test(m, avg_vel=3, penalty_scale=1)

ret = pe.utils.default_solver(max_mins=20, solver='mumps', ipopt_path='ipopt', OF_hessian_approximation='limited-memory').solve(m, tee=True)

pe.visual.info(f'Penalty cost: {pyovalue(pen_cost)}')

In [None]:
rb

In [None]:
ckc_lib.make_animation(m, joint_position_funcs, h_m=2*get_total_time(m)/m.fe[-1], body_width=constants_mapping[rb])

In [None]:
ckc_lib.make_plots(m)

## Force control

Given equation:
$$M\ddot{q} + C\dot{q} + G = B \tau_m + Q + A^T F$$
we want to find $\tau_m$ = motor torque inputs

Assume we can ignore $M$ (especially because it introduces accelerations, which we want to avoid) and $Q$ (which models disturbances, which we'll ignore), solve:
$$\tau_m = B^{-1} (G + C - A^T F)$$

Questions:
1. how to handle gear ratio?
1. why did we calculate `dA`/$\dot{A}$?

In [None]:
# eps = sp.diag([0.001]*M.shape[1], unpack=True)
# sp.inv_quick(M + eps)
# Mi = M.inv('')#('GE', 'LU', 'ADJ', 'CH', 'LDL')

In [None]:
vec = lambda *args: sp.Matrix(args)

def time_derivative(vector):
    return vector.jacobian(vec(*q,*dq)) @ vec(*dq,*ddq)

def time_derivative_matrix(matrix):
    n,m = matrix.shape
    return time_derivative(matrix.reshape(n*m,1)).reshape(n,m)

In [None]:
# Jacobian and time derivative
A = conn_rf.jacobian(q)  # or: conn_lf
dA = time_derivative_matrix(A)
# dA = sp.zeros(*A.shape)

# constraint forces
# Mi = M.inv()
# λ = -(A @ Mi @ A.T).inv() * (A @ Mi @ (B + Q - C - G) + dA @ dq)
# λ = -inv(A*inv(M)*transpose(A))*(A*inv(M)*(B + Q - C - G) + dA*dq)
# λ = sp.simplify(λ)

# eqn = M@ddq + C + G - B - Q - A.T*λ
# eqn = sp.simplify(eqn)

A, dA

The EOM is implemented as:
```python
EOM = M @ ddq + C + G - B - J_L.T @ L - J_c.T @ Fr
```
so you need to be alert! `B` $== B\tau_m \ne B$

In [None]:
# sp.init_printing(wrap_line=True) #     num_columns: int or None

B_ = B.jacobian(τ)
B_inv = sp.inv_quick(B_.T @ B_) @ B_.T

assert B_inv @ B_ == Mat([[1, 0], [0, 1]])

A = J_L
F = Mat([Lx[0], Ly])

# τ_eqn = sp.simplify(B_inv @ (M@ddq + G + C - A.T @ F))
τ_eqn = sp.simplify(B_inv @ (G + C - A.T @ F))
# τ_eqn = sp.simplify(B_inv @ (G - A.T @ F))
τ_eqn

The ODrive does current control, not torque control: https://docs.odriverobotics.com/#current-control (although that's changing in a new version: https://github.com/madcowswe/ODrive/blob/v0.5.1-rc5/Firmware/odrive-interface.yaml#L955)

but for now we need to map current to torque

In [None]:
# torque constant - pretty much randomly chosen right now!
Kt = sp.Symbol('Kt')  # 0.08
current_input = sp.simplify((1/Kt * τ_eqn).subs(constants_mapping))

current_input

In [None]:
# from sympy.utilities.autowrap import ufuncify
# ufuncify([Lx[0], Ly, dthll, dthlr, thul, thur, thll, thlr], current_input[0])
cm = constants_mapping
pycode = f"""
# generated by `closed-kinematic-chain-hopper.ipynb`
def current_control(L_x: float, L_y: float,
                    th_ul: float, th_ll: float,
                    th_lr: float, th_ur: float,
                    dth_ll: float, dth_lr: float,
                    Kt: float = 0.08):
    \"\"\"
    Masses and lengths:
    mb = {cm[mb]:.3f}kg, mu = {cm[mu]:.3f}kg, ml = {cm[ml]:.3f}kg
    rb = {cm[rb]:.3f} m, ru = {cm[ru]:.3f} m, rl = {cm[rl]:.3f} m, g = {cm[g]:.3f}m/s^2
    
    Kt is the torque constant. Currently pretty much randomly chosen
    \"\"\"
    import math
    current_input = (
        {sp.pycode(current_input[0])},
        {sp.pycode(current_input[1])},
    )
    return current_input
""".replace('\\theta_','th_').replace('{', '').replace('}', '').replace('L_x0', 'L_x').replace(r'\dot', 'd')
print(pycode)
# print(sp.julia_code(current_input))

In [None]:
# expr = sp.Eq(G*0, (B + J_L.T @ Mat([Lx[0], Ly]))[:-1])
# G - (B + J_L.T @ Mat([Lx[0], Ly]))
# expr = Mat((B + J_L.T @ Mat([Lx[0], Ly]) - G)[:-1])
# expr
# sp.solve(expr, τ)

# Impedance control!

In [None]:
## 1. make virtual model of the leg: r, th

th1 = thul
th2 = thur
rp = ru * sp.cos((th1-th2)/2) + sp.sqrt(rl**2 - ru**2 * sp.sin((th1-th2)/2)**2)
#rp = sp.simplify(sp.sqrt(conn_rf[1]**2 + conn_rf[0]**2))
#thp = sp.atan2(conn_rf[1], conn_rf[0])
thp = (thul + thur)/2
rp, thp

In [None]:
## 2. find mapping from input torque to force on r and theta
def double_time_deriv(vector):
    return time_derivative(time_derivative(vector))

double_time_deriv(vec(rp, thp)).jacobian([ddthul, ddthur]) @ vec(τl, τr)

In [None]:
EOM[3:5]#ddq[3:5]