In [181]:
from manifpy import SE2, SE2Tangent

import symforce
import symforce.symbolic as sf
from symforce.values import Values
from symforce import ops
from symforce.ops import StorageOps, GroupOps, LieGroupOps
from symforce.notebook_util import display
print(f"symforce uses {symforce.get_symbolic_api()} as backend")

import numpy as np
from numpy.linalg import inv
np.set_printoptions(precision=3, suppress=True)

# import pdb

Vector = np.array

def Covariance():
    return np.zeros((SE2.DoF, SE2.DoF))

def Jacobian():
    return np.zeros((SE2.DoF, SE2.DoF))

def disp_info(elm, name=''):
    print("=========INFO==========")
    print(f"The shape and equation of {name}:")
    display(elm)
    display(elm.shape)
    print("=======================\n")


symforce uses symengine as backend


In [192]:
################################
# START CONFIGURATION

NUMBER_OF_LMKS_TO_MEASURE = 3

# Define the robot pose element and its covariance
X_simulation = SE2.Identity()
X = SE2.Identity()
X_unfiltered = SE2.Identity()
P = Covariance()

u_nom = Vector([0.1, 0.0, 0.05])
u_sigmas = Vector([0.1, 0.1, 0.1])
U = np.diagflat(np.square(u_sigmas))

# Declare the Jacobians of the motion wrt robot and control
J_x = Jacobian()
J_u = Jacobian()

# Define five landmarks in R^2
landmarks = []
landmarks.append(Vector([2.0,  0.0]))
landmarks.append(Vector([2.0,  1.0]))
landmarks.append(Vector([2.0, -1.0]))

# Define the beacon's measurements
measurements = [Vector([0, 0])] * NUMBER_OF_LMKS_TO_MEASURE

y_sigmas = Vector([0.01, 0.01])
R = np.diagflat(np.square(y_sigmas))

# Declare some temporaries
J_xi_x = Jacobian()
J_e_xi = np.zeros((SE2.Dim, SE2.DoF))


##############################
# EXPERIEMENTS

# vis_latex = True 
vis_latex = False 

# fix randomness of the noise below
np.random.seed(0)
    
# Experiment Summary: Make 10 steps. Measure up to three landmarks each time.
for t in range(10):
    print(f'-- step {t} ---------------------------------------------')

    ###########
     # I. Generation of Simulation data
    ###########
    
    # simulate noise
    u_noise = u_sigmas * np.random.rand(SE2.DoF)        # control noise
    u_noisy = u_nom + u_noise                           # noisy control

    u_simu = SE2Tangent(u_nom)
    u_est = SE2Tangent(u_noisy)
    u_unfilt = SE2Tangent(u_noisy)

    # first we move
    X_simulation = X_simulation + u_simu                # overloaded X.rplus(u) = X * exp(u)

    # then we measure all landmarks
    for i in range(NUMBER_OF_LMKS_TO_MEASURE):
        b = landmarks[i]                                # lmk coordinates in world frame

        # simulate noise
        y_noise = y_sigmas * np.random.rand(SE2.Dim)    # measurement noise

        y = X_simulation.inverse().act(b)               # landmark measurement, before adding noise

        y = y + y_noise                                 # landmark measurement, noisy
        measurements[i] = y                             # store for the estimator just below

    ###########
     # II. Estimation
    ###########

    # II-a. First we move
    X = X.plus(u_est, J_x, J_u) # state propagation, X * exp(u), with Jacobians
    P = J_x @ P @ J_x.transpose() + J_u @ U @ J_u.transpose() # covariance propagation 

    # Then we correct using the measurements of each landmarks
    for i in range(NUMBER_OF_LMKS_TO_MEASURE):

        b = landmarks[i] # lmk coordinates in world frame
        y = measurements[i] # lmk measurement, noisy        
        print(f"\ncurrent X is")
        print(f" trans is {X.translation()}")
        print(f"  x is {X.x():.3f}")
        print(f"  y is {X.y():.3f}")
        print(f" angle is {X.angle():.3f}")
              
        # mode selection 
        use_manif, use_symforce = True, False
#         use_manif, use_symforce = False, True 
        
        # expectation (way 1, the original: manif ver.)
        if use_manif:
            # exactly the original manif example code
            e = X.inverse(J_xi_x).act(b, J_e_xi)            # note: e = R.tr * ( b - t ), for X = (R,t).
            H = J_e_xi @ J_xi_x                             # Jacobian of the measurements wrt the robot pose. note: H = J_e_x = J_e_xi * J_xi_x
            E = H @ P @ H.transpose()
            
            print(f"\ne (calc by manif): {e}")
            print(f"H (calc by manif):\n {H}")
            print(H.shape)

        # expectation (way 2, symforce ver., which is the equivalent)    
        if use_symforce:
            # i. build a equation 
            trans_vec = sf.V2.symbolic("t")
            rot_ang = sf.V1.symbolic("theta")
            rot_mat = LieGroupOps.from_tangent(sf.Rot2, rot_ang)
            landmark = sf.V2.symbolic("L")

            error = rot_mat.inverse() * (landmark - trans_vec) # note: e = R.tr * ( b - t ), for X = (R,t).

            Je_trans = error.jacobian(trans_vec)
            Je_rot_ang = error.jacobian(rot_ang)
                # note that we don't need to explicitly care 
                #  the seperated elements for the chain rule (e.g., J_e_xi, J_xi_x)
                #   because SymForce takes care of it internally. 
            
            # ii. inject the values to the error model 
            def inject_values(model):
                val = model.subs({ \
                    trans_vec: sf.V2(X.x(), X.y()),
                    rot_ang: sf.V1(X.angle()), 
                    landmark: sf.V2(b)
                })
                return val.to_numpy()

            e = inject_values(error).transpose().squeeze()
            H_trans = inject_values(Je_trans) 
            H_theta = inject_values(Je_rot_ang) 
            H = np.hstack((H_trans, H_theta))
            E = H @ P @ H.transpose()

            print(f"\ne (calc by symforce): {e}")
            print(f"H (calc by symforce):\n {H}")
            print(H.shape)

            if vis_latex:
                disp_info(error, "error")
                disp_info(Je_trans, "Je_trans")
                disp_info(Je_rot_ang, "Je_rot_ang")
                vis_latex = False 

        # innovation
        z = y - e
        Z = E + R

        # Kalman gain
        K = P @ H.transpose() @ inv(Z)                  # K = P * H.tr * ( H * P * H.tr + R).inv
        # TODO: stacking H (i.e., gathering all information for all landmarks, and update once)
        
        # Correction step
        dx = K @ z                                      # dx is in the tangent space at X

        # II-b. Update
        X = X + SE2Tangent(dx)                          # overloaded X.rplus(dx) = X * exp(dx)
        P = P - K @ Z @ K.transpose()


    # III. Unfiltered (for comparison)

    # move also an unfiltered version for comparison purposes
    X_unfiltered = X_unfiltered + u_unfilt

    # IV. Results

    # DEBUG
    print(' ')
    print('X simulated : ', X_simulation.log().coeffs().transpose())
    print('X estimated : ', X.log().coeffs().transpose())
    print('X unfilterd : ', X_unfiltered.log().coeffs().transpose())
    print('-------------------------------------------------------\n')
    # END DEBUG

-- step 0 ---------------------------------------------

current X is
 trans is [0.151 0.08 ]
  x is 0.151
  y is 0.080
 angle is 0.110

e (calc by manif): [ 1.829 -0.283]
H (calc by manif):
 [[-1.     0.    -0.283]
 [ 0.    -1.    -1.829]]
(2, 3)

current X is
 trans is [0.103 0.035]
  x is 0.103
  y is 0.035
 angle is 0.030

e (calc by manif): [1.924 0.909]
H (calc by manif):
 [[-1.     0.     0.909]
 [ 0.    -1.    -1.924]]
(2, 3)

current X is
 trans is [ 0.098 -0.008]
  x is 0.098
  y is -0.008
 angle is 0.053

e (calc by manif): [ 1.847 -1.092]
H (calc by manif):
 [[-1.    -0.    -1.092]
 [-0.    -1.    -1.847]]
(2, 3)
 
X simulated :  [0.1  0.   0.05]
X estimated :  [ 0.095 -0.004  0.049]
X unfilterd :  [0.155 0.072 0.11 ]
-------------------------------------------------------

-- step 1 ---------------------------------------------

current X is
 trans is [0.225 0.091]
  x is 0.225
  y is 0.091
 angle is 0.152

e (calc by manif): [ 1.741 -0.358]
H (calc by manif):
 [[-1.    -0