In [157]:
# install dependencies
# %pip install PyQt5 pyqtgraph

import scipy as sp
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sympy as sym

from typing import Iterable
import pyqtgraph as pg
from pyqtgraph.Qt import QtCore

In [158]:
# load data
mat = sp.io.loadmat("dataset2.mat")
mat.keys()
# '__header__', '__version__', '__globals__',
# 't', 'r', 'b', 'v', 'om', 'l', 'x_true', 'y_true', 'th_true', 'true_valid',
# 'd', 'r_var', 'b_var', 'v_var', 'om_var'

# ground truth
t = mat["t"]  # 12609 x 1
x_true = mat["x_true"]  # 12609 x 1
y_true = mat["y_true"]  # 12609 x 1
th_true = mat["th_true"]  # 12609 x 1
true_valid = mat["true_valid"]  # 12609 x 1
landmarks = mat["l"]  # 17 x 2 matrix (x,y) of landmark locations
dist = mat[
    "d"
]  # 1 x 1 the distance between the center of the robot and the laser rangeﬁnder

# mesaurements
r = mat["r"]  # 12609 x 17
r_var = mat["r_var"]  # 1 x 1
b = mat["b"]  # 12609 × 17 array containing the bearing
b_var = mat["b_var"]  # 1 x 1
v = mat["v"]  # 12609 × 1 array containing the translational speed
v_var = mat["v_var"]  # 1 x 1
om = mat["om"]  # 12609 × 1 array containing the rotational speed omega
om_var = mat["om_var"]  # 1 x 1

In [159]:
X_true = np.hstack((x_true, y_true, th_true)).reshape(-1, 3)
u = np.hstack((v, om)).reshape(-1, 2)
Y_true = np.hstack((r, b)).reshape(-1, 17, 2)
Q = np.diag(
    [
        v_var.item(),
        om_var.item(),
    ]
)
R = np.diag([r_var.item(), b_var.item()])
T = 0.1
d = dist.item()

# initial state
X_0 = X_true[0]
P_0 = np.diag([1, 1, 0.1])

X_hat = X_0
P_hat = P_0

# input

k = 1
l = 15
## sensor reading
u_k = u[k]

v_k = u_k[0]
om_k = u_k[1]

x_hat = X_hat[0]
y_hat = X_hat[1]
th_hat = X_hat[2]

## landmark
landmark = landmarks[l]
x_l = landmark[0]
y_l = landmark[1]

# prediction

## H
H = np.matrix(
    [[1, 0, -T * np.sin(th_hat) * v_k], [0, 1, T * np.cos(th_hat) * v_k], [0, 0, 1]]
)

## Q_prime
J_h_w = np.matrix([[T * np.cos(th_hat), 0], [T * np.sin(th_hat), 0], [0, T]])
Q_prime = J_h_w @ Q @ J_h_w.T

## P_check
P_check = H @ P_hat @ H.T + Q_prime

## X_check
x_k = x_hat + T * np.cos(th_hat) * v_k
y_k = y_hat + T * np.sin(th_hat) * v_k
th_k = th_hat + T * om_k
X_check = np.array([x_k, y_k, th_k])

## G
a = x_l - x_k - d * np.cos(th_k)
b = y_l - y_k - d * np.sin(th_k)
c = a ** 2 + b ** 2 

G = np.zeros((2, 3))
G[0,0] = - a / np.sqrt(c)
G[0,1] = - b / np.sqrt(c)
G[0,2] = - d * b * np.cos(th_k) + d * a * np.sin(th_k) / np.sqrt(c)
G[1,0] = b / c 
G[1,1] = - a / c
G[1,2] = - 1 - d * b * np.sin(th_k) / c - d * a * np.cos(th_k) / c

print(G)

## R_prime
R_prime = R

## K
K = P_check @ G.T @ np.linalg.inv(G @ P_check @ G.T + R_prime)

# correction
## P_hat
P_hat = (np.eye(3) - K @ G) @ P_check

## X_hat
r_k_l = np.sqrt(c)
b_k_l = np.arctan2(b, a) - th_k

Y_pred = np.array([r_k_l, b_k_l])
X_hat = X_check + K @ (Y_true[k,l] - Y_pred)

print(Y_true[k,l])
print(Y_pred)

print(X_check, X_hat, X_true[k])
# print(P_check, P_hat)

[[-0.00330931 -0.99999452  0.57838324]
 [ 0.36845894 -0.00121935 -0.98122546]]
[-0.25157072 -0.93364939]
[2.71399178 4.47758835]
[ 3.02191105  0.07140687 -2.91010134] [[-5.90457878  4.30104014 -0.76915996]] [ 3.01960641  0.0709297  -2.91005219]


In [160]:
# compute G
x_k, y_k, theta_k, x_l, y_l, d = sym.symbols("x_k y_k theta_k x_l y_l d")

# Define the observation function g(x_k, n_k^l)
g_r = sym.sqrt(
    (x_l - x_k - d * sym.cos(theta_k)) ** 2 + (y_l - y_k - d * sym.sin(theta_k)) ** 2
)
g_phi = (
    sym.atan2(y_l - y_k - d * sym.sin(theta_k), x_l - x_k - d * sym.cos(theta_k))
    - theta_k
)

# Compute the Jacobian matrix of g with respect to x_k, y_k, theta_k

J_g = [
    [sym.diff(g_r, x_k), sym.diff(g_r, y_k), sym.diff(g_r, theta_k)],
    [sym.diff(g_phi, x_k), sym.diff(g_phi, y_k), sym.diff(g_phi, theta_k)],
]

print(J_g)

[[(d*cos(theta_k) + x_k - x_l)/sqrt((-d*sin(theta_k) - y_k + y_l)**2 + (-d*cos(theta_k) - x_k + x_l)**2), (d*sin(theta_k) + y_k - y_l)/sqrt((-d*sin(theta_k) - y_k + y_l)**2 + (-d*cos(theta_k) - x_k + x_l)**2), (-d*(-d*sin(theta_k) - y_k + y_l)*cos(theta_k) + d*(-d*cos(theta_k) - x_k + x_l)*sin(theta_k))/sqrt((-d*sin(theta_k) - y_k + y_l)**2 + (-d*cos(theta_k) - x_k + x_l)**2)], [-(d*sin(theta_k) + y_k - y_l)/((-d*sin(theta_k) - y_k + y_l)**2 + (-d*cos(theta_k) - x_k + x_l)**2), -(-d*cos(theta_k) - x_k + x_l)/((-d*sin(theta_k) - y_k + y_l)**2 + (-d*cos(theta_k) - x_k + x_l)**2), d*(d*sin(theta_k) + y_k - y_l)*sin(theta_k)/((-d*sin(theta_k) - y_k + y_l)**2 + (-d*cos(theta_k) - x_k + x_l)**2) - d*(-d*cos(theta_k) - x_k + x_l)*cos(theta_k)/((-d*sin(theta_k) - y_k + y_l)**2 + (-d*cos(theta_k) - x_k + x_l)**2) - 1]]


$$\left[\begin{matrix}\frac{d \cos{\left(\theta_{k} \right)} + x_{k} - x_{l}}{\sqrt{\left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right)^{2} + \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right)^{2}}} & \frac{d \sin{\left(\theta_{k} \right)} + y_{k} - y_{l}}{\sqrt{\left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right)^{2} + \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right)^{2}}} & \frac{- d \left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right) \cos{\left(\theta_{k} \right)} + d \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right) \sin{\left(\theta_{k} \right)}}{\sqrt{\left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right)^{2} + \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right)^{2}}}\\- \frac{d \sin{\left(\theta_{k} \right)} + y_{k} - y_{l}}{\left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right)^{2} + \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right)^{2}} & - \frac{- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}}{\left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right)^{2} + \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right)^{2}} & \frac{d \left(d \sin{\left(\theta_{k} \right)} + y_{k} - y_{l}\right) \sin{\left(\theta_{k} \right)}}{\left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right)^{2} + \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right)^{2}} - \frac{d \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right) \cos{\left(\theta_{k} \right)}}{\left(- d \sin{\left(\theta_{k} \right)} - y_{k} + y_{l}\right)^{2} + \left(- d \cos{\left(\theta_{k} \right)} - x_{k} + x_{l}\right)^{2}} - 1\end{matrix}\right]$$

In [161]:
# EKF Estimatior
class EKF:
    def __init__(self, x0_hat, P0_hat, Q, R, T):
        self.x_hat = x0_hat
        self.P_hat = P0_hat
        self.x_check = None
        self.P_check = None
        self.Q = Q
        self.R = R
        self.T = T
        self.x_true = None

    def predict(self, u, using_x_true=False):
        if using_x_true:
            if self.x_true is None:
                raise ValueError("x_true is not given")
            else:
                x_op = self.x_true
        else:
            x_op = self.x_hat

        # compute Jacobian h to x and w

        # compute x_check and P_check

    def correct(self, y):
        pass

    def get_result(self):
        return self.x_hat, self.P_hat

    def given_x_true(self, x_true):
        self.x_true = x_true

In [162]:
# Animation
class Animation:
    def __init__(self, landmarks: Iterable[float]) -> None:
        # window
        self.win = pg.plot(show=True)
        self.win.resize(800, 600)
        self.win.setWindowTitle("Animation Ex03 - x1 speed")
        self.win.setBackground("w")
        self.win.setAspectLocked(lock=True, ratio=1)
        self.win.getViewBox().wheelEvent = lambda event: None
        self.win.setXRange(-2, 10)
        self.win.setYRange(-3, 4)

        # colors
        color_landmarks = pg.mkColor(10, 10, 10)  # black
        color_landmarks.setAlpha(255)

        color_pos_true = pg.mkColor(0, 114, 189)  # blue
        color_pos_true.setAlpha(220)

        color_pos_est = pg.mkColor(252, 41, 30)  # red
        color_pos_est.setAlpha(220)

        color_ellipse = pg.mkColor(252, 41, 30)  # red
        color_ellipse.setAlpha(60)

        # landmarks
        self.landmarks = pg.ScatterPlotItem()
        self.landmarks.setPen(color_landmarks)
        self.landmarks.setBrush(color_landmarks)
        self.landmarks.setData(x=landmarks[:, 0], y=landmarks[:, 1])

        # true position
        self.pos_true = pg.ScatterPlotItem()
        self.pos_true.setSize(5)
        self.pos_true.setPen(color_pos_true)
        self.pos_true.setBrush(color_pos_true)

        # estimated position
        self.pos_est = pg.ScatterPlotItem()
        self.pos_est.setSize(5)
        self.pos_est.setPen(color_pos_est)
        self.pos_est.setBrush(color_pos_est)

        # uncertainty ellipse
        self.ellipse = pg.PlotCurveItem()
        self.ellipse.setPen(color_ellipse, width=4)
        self.ellipse.setBrush(color_ellipse)
        self.ellipse.setFillLevel(0)

        # legend
        self.legend = pg.LegendItem()
        self.legend.setParentItem(self.win.graphicsItem())
        self.legend.setOffset((20, 1))
        self.legend.addItem(self.landmarks, "Landmarks")
        self.legend.addItem(self.pos_true, "True position")
        self.legend.addItem(self.pos_est, "Estimated position")
        self.legend.addItem(self.ellipse, "Uncertainty ellipse")

        self.win.addItem(self.landmarks)
        self.win.addItem(self.pos_true)
        self.win.addItem(self.pos_est)
        self.win.addItem(self.ellipse)

    def compute_ellipse(
        self, mean: Iterable[float], covariance: Iterable[float]
    ) -> Iterable[float]:
        eigenvalues, eigenvectors = np.linalg.eig(covariance)
        angle = np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0])
        width, height = 2 * np.sqrt(eigenvalues)
        t = np.linspace(0, 2 * np.pi, 50)
        ell = np.array([width / 2 * np.cos(t), height / 2 * np.sin(t)])
        rot_matrix = np.array(
            [[np.cos(angle), -np.sin(angle)], [np.sin(angle), np.cos(angle)]]
        )
        rotated_ell = rot_matrix @ ell
        rotated_ell[0] += mean[0]
        rotated_ell[1] += mean[1]
        return rotated_ell

    def update(
        self, mean: Iterable[float], covariance: Iterable[float], true_position=None
    ) -> None:
        if true_position is not None:
            self.pos_true.setData([true_position[0]], [true_position[1]])

        self.pos_est.setData([mean[0]], [mean[1]])

        ellipse = self.compute_ellipse(mean, covariance)
        self.ellipse.setData(ellipse[0], ellipse[1])

In [163]:
# test
# l = sp.io.loadmat("dataset2.mat")["l"]
# animation = Animation(l)

# mean = [1, 1]
# cov = [[2, 1], [1, 2]]

# timer = QtCore.QTimer()
# timer.timeout.connect(
#     lambda: animation.update(mean, cov, true_position=[mean[0] + 1, mean[1] + 1])
# )
# timer.start(1000)
# pg.exec()