In [1]:
import abc
import numpy as np
import jax.numpy as jnp
from jax import jacfwd, hessian, jit
from manifpy import SE3, SE3Tangent, SO3, SO3Tangent

In [2]:
def skew( w ):
    if isinstance(w, np.ndarray) or isinstance(w, jnp.ndarray) and w.shape == (3,) or w.shape == (3, 1):
        return np.array([
            [0, -w[2], w[1]],
            [w[2], 0, -w[0]],
            [-w[1], w[0], 0]
        ])
    else:
        raise ValueError("Input must be a 3d vector")

w = np.array([1., 2., 3.])
print(skew(w))

[[ 0. -3.  2.]
 [ 3.  0. -1.]
 [-2.  1.  0.]]


In [3]:
def skew( w ):
    return SO3Tangent(w).hat()

w = np.array([1, 2, 3])
print(skew(w))

[[ 0. -3.  2.]
 [ 3.  0. -1.]
 [-2.  1.  0.]]


In [4]:
def adjoint(x):
    w = np.array([x[0], x[1], x[2]])
    v = np.array([x[3], x[4], x[5]])
    adx = np.block([
        [skew(w), np.zeros((3, 3))],
        [skew(v), skew(w)]
    ])
    return adx

x = [1, 2, 3, 4, 5, 6]
result = adjoint(x)
print(result)

[[ 0. -3.  2.  0.  0.  0.]
 [ 3.  0. -1.  0.  0.  0.]
 [-2.  1.  0.  0.  0.  0.]
 [ 0. -6.  5.  0. -3.  2.]
 [ 6.  0. -4.  3.  0. -1.]
 [-5.  4.  0. -2.  1.  0.]]


In [5]:
class BaseDynamics():

    """Dynamics Model."""

    @property
    @abc.abstractmethod
    def state_size(self):
        """State size."""
        raise NotImplementedError

    @property
    @abc.abstractmethod
    def action_size(self):
        """Action size."""
        raise NotImplementedError

    @property
    @abc.abstractmethod
    def has_hessians(self):
        """Whether the second order derivatives are available."""
        raise NotImplementedError

    @abc.abstractmethod
    def f(self, x, u, i):
        """Dynamics model.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            Next state [state_size].
        """
        raise NotImplementedError

    @abc.abstractmethod
    def f_x(self, x, u, i):
        """Partial derivative of dynamics model with respect to x.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            df/dx [state_size, state_size].
        """
        raise NotImplementedError

    @abc.abstractmethod
    def f_u(self, x, u, i):
        """Partial derivative of dynamics model with respect to u.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            df/du [state_size, action_size].
        """
        raise NotImplementedError

    @abc.abstractmethod
    def f_xx(self, x, u, i):
        """Second partial derivative of dynamics model with respect to x.

        Note:
            This is not necessary to implement if you're planning on skipping
            Hessian evaluation as the iLQR implementation does by default.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            d^2f/dx^2 [state_size, state_size, state_size].
        """
        raise NotImplementedError

    @abc.abstractmethod
    def f_ux(self, x, u, i):
        """Second partial derivative of dynamics model with respect to u and x.

        Note:
            This is not necessary to implement if you're planning on skipping
            Hessian evaluation as the iLQR implementation does by default.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            d^2f/dudx [state_size, action_size, state_size].
        """
        raise NotImplementedError

    @abc.abstractmethod
    def f_uu(self, x, u, i):
        """Second partial derivative of dynamics model with respect to u.

        Note:
            This is not necessary to implement if you're planning on skipping
            Hessian evaluation as the iLQR implementation does by default.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            d^2f/du^2 [state_size, action_size, action_size].
        """
        raise NotImplementedError
    

In [6]:
a = np.random.rand(11)
a[:3]

array([0.1342436 , 0.18268343, 0.13660554])

In [7]:
N = 4
X_ref = np.random.rand(N,2,1)
xi_ref = np.random.rand(N,3,1)
# print(X_ref)
# print(xi_ref)
np.concatenate((X_ref,xi_ref),axis=1)

array([[[9.09760723e-01],
        [5.56910452e-01],
        [6.90337152e-01],
        [8.11175062e-01],
        [6.28056902e-01]],

       [[9.33660288e-01],
        [6.81163232e-04],
        [8.06753384e-01],
        [4.32133667e-01],
        [2.78692060e-01]],

       [[3.50610663e-01],
        [5.10410525e-01],
        [2.72334578e-01],
        [2.00196098e-02],
        [5.13856883e-01]],

       [[6.06513187e-01],
        [1.56648813e-01],
        [8.81634213e-01],
        [8.26414887e-01],
        [6.75508728e-01]]])

In [18]:
A = np.random.rand(6,6)
A = A @ A.T

In [20]:
%%time
L = np.linalg.cholesky(A)
Linv = np.linalg.solve( L, np.identity(6) )
Ainv = Linv.T @ Linv
print(Ainv)

[[ 8.90762974 -6.30882372 -3.80948189 -0.58571977  0.61063064  0.25952718]
 [-6.30882372 10.11889306  0.65236464 -0.83582534 -1.45124707 -0.28781569]
 [-3.80948189  0.65236464  4.87428303 -0.42933674 -0.96889444 -0.09944609]
 [-0.58571977 -0.83582534 -0.42933674  2.10913958  0.34322109 -1.04702776]
 [ 0.61063064 -1.45124707 -0.96889444  0.34322109  2.29916235 -1.15285638]
 [ 0.25952718 -0.28781569 -0.09944609 -1.04702776 -1.15285638  3.03238227]]
CPU times: user 828 µs, sys: 0 ns, total: 828 µs
Wall time: 838 µs


In [21]:
%%time
print( np.linalg.inv(A) )

[[ 8.90762974 -6.30882372 -3.80948189 -0.58571977  0.61063064  0.25952718]
 [-6.30882372 10.11889306  0.65236464 -0.83582534 -1.45124707 -0.28781569]
 [-3.80948189  0.65236464  4.87428303 -0.42933674 -0.96889444 -0.09944609]
 [-0.58571977 -0.83582534 -0.42933674  2.10913958  0.34322109 -1.04702776]
 [ 0.61063064 -1.45124707 -0.96889444  0.34322109  2.29916235 -1.15285638]
 [ 0.25952718 -0.28781569 -0.09944609 -1.04702776 -1.15285638  3.03238227]]
CPU times: user 478 µs, sys: 0 ns, total: 478 µs
Wall time: 471 µs


In [22]:
a = np.random.rand(3,5)
b = np.random.rand(4,5)
print(np.block([
        [a],
        [b]
        ]))
print( np.vstack((a,b)) )

[[0.81395441 0.21674422 0.16729864 0.82036577 0.11535571]
 [0.36515531 0.02512284 0.72556812 0.75765404 0.10534791]
 [0.13952407 0.82226957 0.12387419 0.99658787 0.07102346]
 [0.91897771 0.63753959 0.21432326 0.19429543 0.08449382]
 [0.81716921 0.67154942 0.45811946 0.5770045  0.82114448]
 [0.55512006 0.25611344 0.7711589  0.70296926 0.38319559]
 [0.27738313 0.17019412 0.71301256 0.74763261 0.91568563]]
[[0.81395441 0.21674422 0.16729864 0.82036577 0.11535571]
 [0.36515531 0.02512284 0.72556812 0.75765404 0.10534791]
 [0.13952407 0.82226957 0.12387419 0.99658787 0.07102346]
 [0.91897771 0.63753959 0.21432326 0.19429543 0.08449382]
 [0.81716921 0.67154942 0.45811946 0.5770045  0.82114448]
 [0.55512006 0.25611344 0.7711589  0.70296926 0.38319559]
 [0.27738313 0.17019412 0.71301256 0.74763261 0.91568563]]


In [24]:
class ErrorStateSE3Dynamics(BaseDynamics):

    """Error-State SE(3) Dynamics Model, with Only First-order Derivative"""

    def __init__(self, J, X_ref, xi_ref, dt, integration_method="euler",
                    state_size=(6,6), action_size=6, hessians=False, **kwargs):
        """Constructs an Dynamics model for SE(3).

        Args:
            J: Inertia matrix, diag(I_b, m * I_3), 
                m : body mass,
                I_b : moment of inertia in the body frame.
            X_ref: List of Lie Group reference, (N, error_state_size, 1)
            xi_ref: List of velocity reference, described in Lie Algebra,
                 (N, velocity_size, 1)
            dt: Sampling time
            state_size: Tuple of State variable dimension, 
                ( error state size, velocity state size ).
            action_size: Input variable dimension.
            hessians: Evaluate the dynamic model's second order derivatives.
                Default: only use first order derivatives. (i.e. iLQR instead
                of DDP).
            **kwargs: Additional keyword-arguments to pass to any potential 
                function, e.g. in the preivious version `theano.function()`.
        """

        self._state_size = state_size[0] + state_size[1]
        self._error_state_size = state_size[0] 
        self._vel_state_size = state_size[1] 
        self._action_size = action_size

        self._X_ref = X_ref
        self._xi_ref = xi_ref
        self._x_ref = np.concatenate(( X_ref, xi_ref ), axis = 1)

        self._Ib = J[0:3, 0:3] 
        self._m = J[4,4]
        self._J = J
        self._Jinv = np.linalg.inv(J)
        
        if X_ref.shape[0] != xi_ref.shape[0]:
            raise ValueError("Group reference X and velocity reference should share the same time horizon")
        self._horizon = X_ref.shape[0]
        self._dt = dt

        self.integration_method = integration_method
        if integration_method == "euler":
            self.f = self.fd_euler
        elif integration_method == "rk4":
            self.f = self.fd_rk4
        else:
            raise ValueError("Invalid integration method. Choose 'euler' or 'rk4'.")
        
        self._f = jit(f)
        self._f_x = jit(jacfwd(f))
        self._f_u = jit(jacfwd(f, argnums=1))

        self._has_hessians = hessians
        if hessians:
            self._f_xx = jit(hessian(f, argnums=0))
            self._f_ux = jit(jacfwd( jacfwd(f, argnums=1) ))
            self._f_uu = jit(hessian(f, argnums=1))

        super(ErrorStateSE3Dynamics, self).__init__()

    @property
    def state_size(self):
        """State size."""
        return self._state_size

    @property
    def error_state_size(self):
        """Error-state size."""
        return self._error_state_size

    @property
    def vel_state_size(self):
        """Velocity state size."""
        return self._vel_state_size

    @property
    def action_size(self):
        """Action size."""
        return self._action_size

    @property
    def has_hessians(self):
        """Whether the second order derivatives are available."""
        return self._has_hessians

    @property
    def Ib(self):
        """Moment of inertia in the body frame."""
        return self._Ib

    @property
    def m(self):
        """Mass of the system."""
        return self._Ib

    @property
    def J(self):
        """Inertia matrix of the system."""
        return self._J
    
    @property
    def Jinv(self):
        """Inverse of the inertia matrix."""
        return self._Jinv

    @property
    def horizon(self):
        """The horizon for dynamics to be valid, due to horizon of given reference."""
        return self._horizon
    
    @property
    def dt(self):
        """Sampling time of the system dynamics."""
        return self._dt

    def xi_ref(self, i) :
        """Return the Lie Algebra velocity xi reference xi_ref at time index i."""
        return self._xi_ref(i)

    def X_ref(self, i) :
        """Return the Lie group reference X_ref at time index i."""
        return self._X_ref(i)

    def x_ref(self, i) :
        """Return the concatenated Lie group and Lie algebra reference X_ref at time index i."""
        return self._x_ref(i)
    
    def skew( self, w ):
        """Get the isomorphic element in the Lie algebra for SO3, 
            i.e. the skew symmetric matrix."""
        if isinstance(w, np.ndarray) or isinstance(w, jnp.ndarray) and w.shape == (3,) or w.shape == (3, 1):
            return np.array([
                [0, -w[2], w[1]],
                [w[2], 0, -w[0]],
                [-w[1], w[0], 0]
            ])
        else:
            raise ValueError("Input must be a 3d vector")

    def adjoint( self, xi ):
        """ Get the the adjoint matrix representation of Lie Algebra."""
        w = np.array([xi[0], xi[1], xi[2]])
        v = np.array([xi[3], xi[4], xi[5]])
        adx = np.block([
            [self.skew(w), np.zeros((3, 3))],
            [self.skew(v), self.skew(w)]
        ])
        return adx
    
    def coadjoint( self, xi ):
        """ Get the the coadjoint matrix representation of Lie Algebra."""
        return self.adjoint(xi).T

    def fc(self, x, u, i):
        """ Continuous linearized dynamicsf.

        Args:
            x: Current state [state_size], stacked by
                error-state and velocity, both on Lie Algebra
            u: Current control [action_size].
            i: Current time step.

        Returns:
            Next state [state_size].
        """

        # psi = x[:self.error_state_size]
        xi = x[-self.vel_state_size:]
        omega = xi[:3]
        v = xi[-3:]

        G = np.block([
            [self.skew( self.Ib @ omega ), self.m * self.skew( v )],
            [self.m * self.skew( v ), np.zeros((3,3))],        
        ])
        Ht = - self.Jinv @ ( self.coadjoint( xi ) @ self.J + G )
        bt = - self.Jinv @ G @ xi

        At = np.stack([
            [- self.adjoint( self.xi_ref(i) ), np.identity( self.error_state_size )],
            [np.zeros((self.vel_state_size, self.error_state_size)), Ht]
        ])
        Bt = np.vstack((np.zeros((self.error_state_size, self.action_size)),
                self.Jinv ))
        ht = np.vstack( (-self.xi_ref(i), bt ))

        xt_dot = At @ x + Bt @ u + ht
        
        return xt_dot
    
    def fd_euler( self, x, u, i ):
        """ Descrtized dynamics with Eular method.

        Args:
            x: Current state [state_size], stacked by
                error-state and velocity, both on Lie Algebra
            u: Current control [action_size].
            i: Current time step.

        Returns:
            Next state [state_size].
        """
        return x + self.fc( x,u,i ) * self.dt
    
    def fd_rk4( self, x, u, i ):
        """ Descrtized dynamics with RK4 method.

        Args:
            x: Current state [state_size], stacked by
                error-state and velocity, both on Lie Algebra
            u: Current control [action_size].
            i: Current time step.

        Returns:
            Next state [state_size].
        """
        s1 = self.fc( x, u, i)
        s2 = self.fc( x+ self.dt/2*s1, u, i )
        s3 = self.fc( x+ self.dt/2*s2, u, i )
        s4 = self.fc( x+ self.dt*s3, u, i )
        x_next = x + self.dt/6 * ( s1 + 2 * s2 + 2 * s3 + s4 )
    
        return x_next

    def f_x(self, x, u, i):
        """Partial derivative of dynamics model with respect to x.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            df/dx [state_size, state_size].
        """
        return self._f_x(x,u,i)

    def f_u(self, x, u, i):
        """Partial derivative of dynamics model with respect to u.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            df/du [state_size, action_size].
        """
        return self._f_u(x,u,i)

    def f_xx(self, x, u, i):
        """Second partial derivative of dynamics model with respect to x.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            d^2f/dx^2 [state_size, state_size, state_size].
        """
        if not self._has_hessians:
            raise NotImplementedError

        return self._f_xx(x,u,i)

    def f_ux(self, x, u, i):
        """Second partial derivative of dynamics model with respect to u and x.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            d^2f/dudx [state_size, action_size, state_size].
        """
        if not self._has_hessians:
            raise NotImplementedError

        return self._f_ux(x,u,i)

    def f_uu(self, x, u, i):
        """Second partial derivative of dynamics model with respect to u.

        Args:
            x: Current state [state_size].
            u: Current control [action_size].
            i: Current time step.

        Returns:
            d^2f/du^2 [state_size, action_size, action_size].
        """
        if not self._has_hessians:
            raise NotImplementedError

        return self._f_uu(x,u,i)