In [1]:
import numpy as np
import matplotlib.pylab as plt
import matplotlib.gridspec as gridspec
import pandas as pd

from bagpy import bagreader

In [2]:
# import data from recording at Klitten FVZ
results_file = "./Klitten/loc_results_2023-11-01-12-53-52_0.bag"
sensor_file = "./Klitten/recordedData_sensors_locpc_2023-11-01-12-53-52_0.bag"

result_reader = bagreader(results_file)
sensor_reader = bagreader(sensor_file)

[INFO]  Data folder ./Klitten/loc_results_2023-11-01-12-53-52_0 already exists. Not creating.
[INFO]  Data folder ./Klitten/recordedData_sensors_locpc_2023-11-01-12-53-52_0 already exists. Not creating.


In [3]:
# Thius function extracts the sensordata into CSV-files

map_path = result_reader.message_by_topic(topic="/odometry/filtered/map")
odom_path = result_reader.message_by_topic(topic="/odometry/filtered/odom")
gps_odometry_path  = result_reader.message_by_topic(topic="/odometry/gps")
imu_oem = sensor_reader.message_by_topic(topic="/localization_pc/can0/decoded/imu")
speed_oem = sensor_reader.message_by_topic(topic="/localization_pc/can0/decoded/speed")
steeringAngle_oem = sensor_reader.message_by_topic(topic="/localization_pc/can0/decoded/steeringAngle")
gnss_oem = sensor_reader.message_by_topic(topic="/localization_pc/can0/decoded/gnss")
imu_anavs = sensor_reader.message_by_topic(topic="/localization_pc/sensor/anavs/imu")
navsat_anavs = sensor_reader.message_by_topic(topic="/localization_pc/sensor/anavs/navsat")

In [4]:
# You can then use Pandas to read the files.
df_map_path = pd.read_csv(map_path)  
df_odom_path = pd.read_csv(odom_path)  
df_gps_path = pd.read_csv(gps_odometry_path)
df_speed = pd.read_csv(speed_oem)
df_imu = pd.read_csv(imu_oem)

# 我写的
df_imu_anavs = pd.read_csv(imu_anavs)
df_navsat_anvas = pd.read_csv(navsat_anavs)

# 1. Design the State Variables

$$
\mathbf{X} =
\begin{bmatrix}
x \\
y \\
\theta \\
v \\
\omega
\end{bmatrix}
$$

- **\([x, y]\)**: Position of the vehicle  
- **\(\theta\)**: Heading angle (\(^\circ\))  
- **\(\mathbf{v}\)**: Speed (\([m/s]\))  
- **\(\omega\)**: Yaw rate (\([rad/s]\))  


# 2. Design the System Model

## Jacobian Matrix checkup 

In [37]:
from sympy import symbols, Matrix, sin, cos, init_printing

# initialize to show in the form of LaTex
init_printing(use_latex="mathjax", fontsize="16pt")

# define the symbles of variables
x, y, theta, v, omega, T = symbols('x y theta v omega T')  

# define the state vector
X = Matrix([x, y, theta, v, omega])  

# define state transition f(x)
f_x = Matrix([
    x + v/omega * (sin(theta + omega*T) - sin(theta)), 
    y + v/omega * (cos(theta) - cos(theta + omega*T)),  
    theta + omega*T,
    v,
    omega  
])

# calculate jacobian matrix J
J = f_x.jacobian(X)

J

⎡      v⋅(-cos(θ) + cos(T⋅ω + θ))  -sin(θ) + sin(T⋅ω + θ)  T⋅v⋅cos(T⋅ω + θ)   v⋅(-sin(θ) + sin(T⋅ω + θ))⎤
⎢1  0  ──────────────────────────  ──────────────────────  ──────────────── - ──────────────────────────⎥
⎢                  ω                         ω                    ω                        2            ⎥
⎢                                                                                         ω             ⎥
⎢                                                                                                       ⎥
⎢      v⋅(-sin(θ) + sin(T⋅ω + θ))  cos(θ) - cos(T⋅ω + θ)   T⋅v⋅sin(T⋅ω + θ)   v⋅(cos(θ) - cos(T⋅ω + θ)) ⎥
⎢0  1  ──────────────────────────  ─────────────────────   ──────────────── - ───────────────────────── ⎥
⎢                  ω                         ω                    ω                       2             ⎥
⎢                                                                                        ω              ⎥
⎢                                             

In [30]:
# reduce common expressions
R = symbols('R')
J = J.subs(v/omega, R)
J

⎡                                  -sin(θ) + sin(T⋅ω + θ)                     R⋅(-sin(θ) + sin(T⋅ω + θ))⎤
⎢1  0  R⋅(-cos(θ) + cos(T⋅ω + θ))  ──────────────────────  R⋅T⋅cos(T⋅ω + θ) - ──────────────────────────⎥
⎢                                            ω                                            ω             ⎥
⎢                                                                                                       ⎥
⎢                                  cos(θ) - cos(T⋅ω + θ)                      R⋅(cos(θ) - cos(T⋅ω + θ)) ⎥
⎢0  1  R⋅(-sin(θ) + sin(T⋅ω + θ))  ─────────────────────   R⋅T⋅sin(T⋅ω + θ) - ───────────────────────── ⎥
⎢                                            ω                                            ω             ⎥
⎢                                                                                                       ⎥
⎢0  0              1                         0                                   T                      ⎥
⎢                                             

# 3. Design the Measurement Model

In [34]:
# Die Sensoren können direkt die Messdaten der Variablen liefern
z = Matrix([x, y, theta, v, omega])
H = z.jacobian(X)
H

⎡1  0  0  0  0⎤
⎢             ⎥
⎢0  1  0  0  0⎥
⎢             ⎥
⎢0  0  1  0  0⎥
⎢             ⎥
⎢0  0  0  1  0⎥
⎢             ⎥
⎣0  0  0  0  1⎦

# 4. Design Measurement Noise

# 5. Implementation 

In [35]:
from filterpy.kalman import ExtendedKalmanFilter as EKF
from numpy import array, sqrt
class CTRV_EKF(EKF):
    def __init__(self, dt, Q):  # dt is value for specific timestep
        # state dimension 5, measurement dimension 5, control input dimension 0
        EKF.__init__(self, 5, 5, 0)

        self.dt = dt
        self.Q = Q  # Processnoise covariance matrix
        # symble definition
        x, y, theta, v, omega, time = symbols('x y theta v omega T')  # "T" is symble variable for symble expression, "time" is the saved reference-name of "T"
        # "="前面的x, y 是通过 sympy.symbols('x y ...') 定义的符号变量; 这些符号变量是用来进行符号运算（如雅可比矩阵计算）的数学符号; 这些符号变量会被赋值到 self.x_x 和 self.x_y
        # "="后面是 sympy.symbols 定义的字符串 'x y'，仅仅是符号定义的文本，不涉及变量引用
        
        R = v/omega

        # state vector
        X = Matrix([x, y, theta, v, omega]) 
        
        # nonlinear system model    
        self.f_x = Matrix([
            x + v/omega * (sin(theta + omega*time) - sin(theta)), 
            y + v/omega * (cos(theta) - cos(theta + omega*time)),  
            theta + omega*time,
            v,
            omega
        ])

        # calculate jacobian matrix
        self.F_J = self.f_x.jacobian(X)

        # replace dictionary and  it's variables
        self.subs                      = {x: 0, y: 0, theta: 0, v: 0, omega: 0, time: dt}  # 等号后面的x, y 是替换字典 self.subs 的键。
        self.x_x, self.x_y,            = x, y                                             # 引用的是symble definition中的符号变量 x 和 y
        self.v, self.omega, self.theta = v, omega, theta


        def predict(self):
            self.X                = self.driving(self.X, self.dt)  # self.X after "="：the current state vector; self.X before "="：the next state vector
            self.subs[self.x_x]   = self.X[0, 0]
            self.subs[self.x_y]   = self.X[1, 0]
            self.subs[self.theta] = self.x[2, 0]
            self.subs[self.v]     = self.x[3, 0]
            self.subs[self.omega] = self.x[4, 0]

            # predict the value of Jacobian Matrix
            J = array(self.F_J.evalf(subs=self.subs)).astype(float)

            self.P = F @ self.P @ F.T