In [1]:
import numpy as np
import matplotlib.pyplot as plt
import math
import copy
import scipy.linalg

In [2]:
def rot2D(theta):
    return np.array([math.cos(theta), -math.sin(theta), math.sin(theta), math.cos(theta)]).reshape(2,2)

def rot3D(theta):
    o = np.eye(3)
    o[:2,:2] = rot2D(theta)
    return o

def numeraical_jacobian(pos1, pos2, model):
    eps = 1e-12
    pos_H = []
    neg_H = []
    for i,x in enumerate(pos1):
        v = copy.copy(pos1)
        v[i] = x + eps
        pos_H.append(model(v, pos2))
    for i,x in enumerate(pos2):
        v = copy.copy(pos2)
        v[i] = x + eps
        pos_H.append(model(pos1, v))

    for i,x in enumerate(pos1):
        v = copy.copy(pos1)
        v[i] = x - eps
        neg_H.append(model(v, pos2))
    for i,x in enumerate(pos2):
        v = copy.copy(pos2)
        v[i] = x - eps
        neg_H.append(model(pos1, v))
        
    H = (np.vstack(pos_H).T - np.vstack(neg_H).T) / (2*eps)
    return H


def odom_model(pos1, pos2):
#     x1, y1, t1 = pos1
#     x2, y2, t2 = pos2
#     H = np.eye(3)
#     R = rot2D(t2-t1)
#     odom = pos2[:2] - np.dot(R, pos1[:2].reshape(-1,1)).reshape(-1,)
    return pos2-pos1

def odom_jacobian(pos1, pos2):
    x1, y1, t1 = pos1
    x2, y2, t2 = pos2
#     H = np.array([[-math.cos(t2-t1), math.sin(t2-t1), -x1*math.sin(t2-t1) - y1*math.cos(t2-t1),
#                   1, 0, +x1*math.sin(t2-t1) + y1*math.cos(t2-t1)],
#                  [-math.sin(t2-t1), -math.cos(t2-t1), x1*math.cos(t2-t1) - y1*math.sin(t2-t1),
#                   0, 1, -x1*math.cos(t2-t1) + y1*math.sin(t2-t1)],
#                  [0, 0, -1, 0, 0, 1]])
    H = np.zeros((3,6))
    H[0,0] = -1
    H[0,3] = 1
    H[1,1] = -1
    H[1,4] = 1
    H[2,2] = -1
    H[2,5] = 1
    return H

def measurement_model_w(pose_w, plane_w):
    n_w = plane_w[:3].reshape(-1,1)
    d_w = plane_w[-1]
    x,y,theta = pose_w
    h = np.array([x,y,1]).reshape(-1,1)
    n_l = np.dot(rot3D(theta), n_w).reshape(-1,)
    d_l = (np.dot(n_w.T, h) + d_w).reshape(-1,) / np.linalg.norm(n_w)
    return np.hstack([n_l,d_l])

# TODO: pass in normalized n
def invert_measurement_l(pose_w, plane_l):
    n_l = plane_l[:3].reshape(-1,1)
    d_l = plane_l[-1]
    x,y,theta = pose_w
    h = np.array([x,y,1]).reshape(-1,1)
    n_w = np.dot(rot3D(theta).T, n_l).reshape(-1,)
    
    d_w = (-np.dot(n_w.T, h) + d_l).reshape(-1,) 
    return np.hstack([n_w,d_w])

def measurement_jacobian(pose, plane):
    x,y,theta = pose
    nx_w,ny_w,nz_w,d_w = plane
    
    H = np.array([[0, 0, -math.sin(theta)*nx_w - math.cos(theta)*ny_w, math.cos(theta), -math.sin(theta), 0, 0],
                  [0 ,0, math.cos(theta)*nx_w - math.sin(theta)*ny_w, math.sin(theta), math.cos(theta), 0, 0],
                  [0, 0, 0, 0, 0, 1, 0],
                  [nx_w, ny_w, 0, x, y, 1, 1]]) # re-compute normalized jacobian
    
    return H

In [3]:
x0 = np.array([0,0,0])
x1 = np.array([2,3,np.deg2rad(45)])
x2 = np.array([5,5,np.deg2rad(-45)])

plane1 = np.array([0.707, 0.707, 0, 5])
plane2 = np.array([-0.707, 0.707, 0, 2])

gt = np.hstack([x0, x1, plane1, plane2])

def odom_noise():
    return np.hstack([np.random.normal(0, 0.6, 2), np.random.normal(0, 0.1, 1)])

def measurement_noise(std):
    return np.random.normal(0,std, 4)

In [4]:
odom1 = odom_model(x0,x1) + odom_noise()
odom2 = odom_model(x1,x2) + odom_noise()

measurement11 = measurement_model_w(x0, plane1) + measurement_noise(0)
measurement12 = measurement_model_w(x0, plane2) + measurement_noise(0)
measurement21 = measurement_model_w(x1, plane1) + measurement_noise(0)
measurement22 = measurement_model_w(x1, plane2) + measurement_noise(0)
landmark_measurements = np.hstack([measurement11, measurement12, measurement21, measurement22])

n1 = plane1[:3] + np.random.normal(0,0.1, 3)
n1 /= np.linalg.norm(n1)
plane1[:3] = n1

n2 = plane2[:3] + np.random.normal(0,0.1, 3)
n2 /= np.linalg.norm(n2)
plane2[:3] = n2

init_measurement1 = measurement_model_w(x0, plane1)
init_measurement2 = measurement_model_w(x0, plane2)


In [5]:
#std_x = np.array([0.5, 0.5, 0.1]) # x, y, theta
std_x = np.array([0.6, 0.6, .1]) # x, y, theta
#std_l = np.array([1, 1, 1, 1]) # nx, ny, nz, d
std_l = np.array([0.1, 0.1, 0.1, 0.1]) # nx, ny, nz, d

In [6]:
s_x0 = x0
s_l = np.hstack([invert_measurement_l(x0, init_measurement1), invert_measurement_l(x0,init_measurement2)])
print(s_l)
s_x1 = np.hstack([x0, x0+odom1])

s = np.hstack([s_x1, s_l])
print(s)
def generateAB(s_x1, s_l, odom1, landmark_measurements, std_x, std_l, dim_x=3, dim_l=4):
    num_x = int(len(s_x1) / dim_x)
    num_l = int(len(s_l) / dim_l)
    num_l_measurements = int(len(landmark_measurements) / dim_l)
    
    std_p = np.array([0.05, 0.05, 0.005]) # x, y, theta prior
    A_x1 = np.zeros((6,14))
    A_x1[:3,:3] = np.eye(3) / np.sqrt(std_p)
    odom_jac = odom_jacobian(s_x1[0:3], s_x1[3:6]) 
    std_x_repeated = np.tile(std_x, 2)
    A_x1[3:,:6] = odom_jac / np.sqrt(std_x_repeated)

    A_l1 = np.zeros((16,14))
    #measurement_jac1 = measurement_jacobian(s_x1[:3], s_l[:4])
    measurement_jac1 = numeraical_jacobian(s_x1[:3], s_l[:4], measurement_model_w)
    A_l1[:4,0:3] = measurement_jac1[:, 0:3] / np.sqrt(std_x)
    A_l1[:4,6:10] = measurement_jac1[:, 3:7] / np.sqrt(std_l)
    
    #measurement_jac2 = measurement_jacobian(s_x1[:3], s_l[4:])
    measurement_jac2 = numeraical_jacobian(s_x1[:3], s_l[4:], measurement_model_w)
    A_l1[4:8,0:3] = measurement_jac2[:, 0:3] / np.sqrt(std_x)
    A_l1[4:8,10:14] = measurement_jac2[:, 3:7] / np.sqrt(std_l)

    #measurement_jac1 = measurement_jacobian(s_x1[3:], s_l[:4])
    measurement_jac1 = numeraical_jacobian(s_x1[3:], s_l[:4], measurement_model_w)
    A_l1[8:12,3:6] = measurement_jac1[:, 0:3] / np.sqrt(std_x) 
    A_l1[8:12,6:10] = measurement_jac1[:, 3:7] / np.sqrt(std_l)
    #measurement_jac2 = measurement_jacobian(s_x1[3:], s_l[4:])
    measurement_jac2 = numeraical_jacobian(s_x1[3:], s_l[4:], measurement_model_w)
    A_l1[12:16,3:6] = measurement_jac2[:, 0:3] / np.sqrt(std_x)
    A_l1[12:16,10:14] = measurement_jac2[:, 3:7] / np.sqrt(std_l)

    m_x1 = np.hstack([np.array([0,0,0]), odom1])
    p_x1 = np.hstack([np.array([0,0,0]), odom_model(s_x1[:3], s_x1[3:])])
    std_x_repeated = np.tile(std_x, num_x - 1) 
    b_x1 = (m_x1 - p_x1) / np.sqrt(np.hstack([std_p, std_x_repeated]))

    m_l1 = landmark_measurements
    p_l1 = np.hstack([measurement_model_w(s_x1[:3], s_l[:4]), measurement_model_w(s_x1[:3], s_l[4:8]),
                      measurement_model_w(s_x1[3:6], s_l[:4]), measurement_model_w(s_x1[3:6], s_l[4:8])])
    std_l_repeated = np.tile(std_l, num_l_measurements)
    b_l1 = (m_l1 - p_l1) / np.sqrt(std_l_repeated)

    A = np.vstack([A_x1, A_l1])
    b = np.hstack([b_x1, b_l1])
    return A,b

A,b = generateAB(s_x1, s_l, odom1, landmark_measurements, std_x, std_l)
print(A.shape, b.shape, s.shape)

[ 0.69017617  0.71705915 -0.09738085  5.         -0.70914853  0.70496418
  0.01157014  2.        ]
[ 0.          0.          0.          1.87371677  3.27058645  0.80717348
  0.69017617  0.71705915 -0.09738085  5.         -0.70914853  0.70496418
  0.01157014  2.        ]
(22, 14) (22,) (14,)


In [7]:
def gauss_newton(s, odom1, landmark_measurements, std_x, std_l, max_iter=1000):
    for i in range(max_iter):
        s_x1 = s[:6]
        s_l = s[6:]
        A,b = generateAB(s_x1, s_l, odom1, landmark_measurements, std_x, std_l)
        if(i == 1): 
            print("START ERROR: ", np.linalg.norm(b))
        dx = scipy.linalg.solve(np.dot(A.T,A),np.dot(A.T,b))
        s_new = s + dx
        _,b_new = generateAB(s_new[:6], s_new[6:], odom1, landmark_measurements, std_x, std_l)
        if(np.linalg.norm(b_new) > np.linalg.norm(b)):
            print("Error going up - breaking", i)
            return s, np.linalg.norm(b)
        s = s_new
        if(np.linalg.norm(b_new - b) < 1e-12):
            print("Converged",i)
            break
    return s, np.linalg.norm(b_new)

In [8]:
def lm(s, odom1, landmark_measurements, std_x, std_l, max_iter=1000):
    lambda_lm = 10
    
    for i in range(max_iter):
        s_x1 = s[:6]
        s_l = s[6:]
        A,b = generateAB(s_x1, s_l, odom1, landmark_measurements, std_x, std_l)
        dx = scipy.linalg.solve(np.dot(A.T,A) + np.eye(A.shape[1]) * lambda_lm, np.dot(A.T,b))
        s_new = s + dx
        _,b_new = generateAB(s_new[:6], s_new[6:], odom1, landmark_measurements, std_x, std_l)
        if(np.linalg.norm(b_new) > np.linalg.norm(b)):
            lambda_lm *= 10
            continue
        else:
            lambda_lm /= 10
            s = s_new
            
        if(np.linalg.norm(b_new - b) < 1e-12):
            print("Converged",i)
            break
    return s, np.linalg.norm(b_new)

In [9]:
out_g, err_g = gauss_newton(s, odom1, landmark_measurements, std_x, std_l)
out_lm, err_lm = lm(s, odom1, landmark_measurements, std_x, std_l)
print(np.round(s, 2))
print(np.round(out_g,2))
print(np.round(out_lm,2))
print(gt)

print("ACTUAL ERRORS: ")
print(err_g)
print(err_lm)

START ERROR:  0.39444667976575676
Error going up - breaking 58
Converged 186
[ 0.    0.    0.    1.87  3.27  0.81  0.69  0.72 -0.1   5.   -0.71  0.7
  0.01  2.  ]
[ 0.   -0.   -0.06  1.84  3.2   0.74  0.68  0.73  0.    4.97 -0.75  0.66
  0.    1.99]
[ 0.    0.01 -0.06  1.84  3.21  0.74  0.68  0.74  0.    4.96 -0.75  0.66
  0.    1.99]
[ 0.          0.          0.          2.          3.          0.78539816
  0.707       0.707       0.          5.         -0.707       0.707
  0.          2.        ]
ACTUAL ERRORS: 
0.20240596267136238
0.20182583832267523


In [10]:
s_error = np.linalg.norm(gt-s)
og_error = np.linalg.norm(gt-out_g)
olm_error = np.linalg.norm(gt-out_lm)
print(s_error, og_error, olm_error)

0.31567155820080206 0.2832340640953949 0.29075566815634024


In [11]:
print(np.round(abs(gt[:6]-out_g[:6]),3))

[0.    0.    0.058 0.164 0.201 0.048]


In [12]:
print(np.round(abs(gt[:6]-out_lm[:6]),3))

[0.001 0.007 0.06  0.164 0.209 0.049]


In [13]:
print(np.round(abs(gt[:6]-s[:6]),3))

[0.    0.    0.    0.126 0.271 0.022]


In [14]:
import pandas as pd
x_results_pd = pd.DataFrame(np.vstack([s[:6], out_g[:6], out_lm[:6], gt[:6]]))
l_results_pd = pd.DataFrame(np.vstack([s[6:], out_g[6:], out_lm[6:], gt[6:]]))
print("STATE!!!!")
print(x_results_pd)

print("LANDMARKS!!!!")
print(l_results_pd)

STATE!!!!
          0         1         2         3         4         5
0  0.000000  0.000000  0.000000  1.873717  3.270586  0.807173
1  0.000001 -0.000031 -0.058100  1.836313  3.201329  0.737429
2  0.001338  0.007312 -0.059962  1.836265  3.208960  0.736076
3  0.000000  0.000000  0.000000  2.000000  3.000000  0.785398
LANDMARKS!!!!
          0         1             2         3         4         5  \
0  0.690176  0.717059 -9.738085e-02  5.000000 -0.709149  0.704964   
1  0.677996  0.734779  4.193132e-19  4.968419 -0.752935  0.657689   
2  0.676804  0.735781  2.503874e-09  4.961486 -0.753365  0.657213   
3  0.707000  0.707000  0.000000e+00  5.000000 -0.707000  0.707000   

              6         7  
0  1.157014e-02  2.000000  
1  4.410626e-07  1.991847  
2  5.334249e-10  1.988584  
3  0.000000e+00  2.000000  


# TODO:
--------------------------------------------
- programatically create A and b (pipeline)
- visualizations to show convergence or not (2d planes + our pose vs. gt)
- point to plane optimization
--------------------------------------------

--------------------------------------------
- keep points and stich points onto to the plane
- visualize the point cloud
--------------------------------------------

# Note: 
- make sure measurements are normalized planes
- ensure theta std is lower