Skip to content

Commit

Permalink
Refactor operations to fit EKF to original formulations.
Browse files Browse the repository at this point in the history
  • Loading branch information
Mayitzin committed Jan 2, 2021
1 parent 8cc417d commit 0f987bc
Showing 1 changed file with 70 additions and 46 deletions.
116 changes: 70 additions & 46 deletions ahrs/filters/ekf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
"""

import numpy as np
from ..common.orientation import *
from ..common.mathfuncs import *
from ..common.orientation import q2R
from ..common.mathfuncs import skew
from ..common import DEG2RAD

class EKF:
Expand All @@ -36,20 +36,23 @@ class EKF:
Default values: :math:`\\sigma = \\begin{bmatrix} 0.3^2 & 0.5^2 & 0.8^2 \\end{bmatrix}`
"""
def __init__(self, gyr: np.ndarray = None, acc: np.ndarray = None, mag: np.ndarray = None, **kwargs):
def __init__(self,
gyr: np.ndarray = None,
acc: np.ndarray = None,
mag: np.ndarray = None,
frequency: float = 100.0, **kwargs):
self.gyr = gyr
self.acc = acc
self.mag = mag
self.frequency = kwargs.get('frequency', 100.0)
self.frequency = frequency
self.Dt = kwargs.get('Dt', 1.0/self.frequency)
self.q0 = kwargs.get('q0')
self.noises = kwargs.get('noises', [0.3**2, 0.5**2, 0.8**2])
self.g_noise = self.noises[0]*np.identity(3)
self.a_noise = self.noises[1]*np.identity(3)
self.m_noise = self.noises[2]*np.identity(3)
self.R = np.diag(np.repeat(self.noises[1:], 3))
self.P = np.identity(4)
# Process of data is given
if self.acc is not None and self.gyr is not None:
if all([x is not None for x in [self.gyr, self.acc, self.mag]]):
self.Q = self._compute_all()

def _compute_all(self):
Expand All @@ -76,24 +79,50 @@ def _compute_all(self):
Q[t] = self.update(Q[t-1], self.gyr[t], self.acc[t], self.mag[t])
return Q

def jacobian(self, q: np.ndarray, v: np.ndarray) -> np.ndarray:
def dfdq(self, omega):
"""Linearization of state with partial derivative (Jacobian.)
Parameters
----------
omega : numpy.ndarray
Angular velocity in rad/s.
Returns
-------
F : numpy.ndarray
Jacobian of state.
"""
Jacobian of vector :math:`\\mathbf{v}` with respect to quaternion :math:`\\mathbf{q}`.
f = [1.0, *0.5*self.Dt*omega]
return np.array([
[f[0], -f[1], -f[2], -f[3]],
[f[1], f[0], -f[3], f[2]],
[f[2], f[3], f[0], -f[1]],
[f[3], -f[2], f[1], f[0]]])

def dhdq(self, q, v):
"""Linearization of observations with partial derivative (Jacobian.)
Parameters
----------
q : numpy.ndarray
Quaternion.
Predicted state estimate.
v : numpy.ndarray
vector to build a Jacobian from.
Observations.
Returns
-------
J : numpy.ndarray
Jacobian of observations.
"""
qw, qx, qy, qz = q
vx, vy, vz = v
J = 2.0*np.array([[qy*vz - qz*vy, qy*vy + qz*vz, qw*vz + qx*vy - 2.0*qy*vx, qx*vz - qw*vy - 2.0*qz*vx],
[qz*vx - qx*vz, qy*vx - 2.0*qx*vy - qw*vz, qx*vx + qz*vz, qw*vx + qy*vz - 2.0*qz*vy],
[qx*vy - qy*vx, qw*vy - 2.0*qx*vz + qz*vx, qz*vy - 2.0*qy*vz - qw*vx, qx*vx + qy*vy]])
return J
J = np.array([
[qy*v[2] - qz*v[1], qy*v[1] + qz*v[2], qw*v[2] + qx*v[1] - 2.0*qy*v[0], qx*v[2] - qw*v[1] - 2.0*qz*v[0]],
[qz*v[0] - qx*v[2], qy*v[0] - 2.0*qx*v[1] - qw*v[2], qx*v[0] + qz*v[2], qw*v[0] + qy*v[2] - 2.0*qz*v[1]],
[qx*v[1] - qy*v[0], qw*v[1] - 2.0*qx*v[2] + qz*v[0], qz*v[1] - 2.0*qy*v[2] - qw*v[0], qx*v[0] + qy*v[1]],
[qy*v[5] - qz*v[4], qy*v[4] + qz*v[5], qw*v[5] + qx*v[4] - 2.0*qy*v[3], qx*v[5] - qw*v[4] - 2.0*qz*v[3]],
[qz*v[3] - qx*v[5], qy*v[3] - 2.0*qx*v[4] - qw*v[5], qx*v[3] + qz*v[5], qw*v[3] + qy*v[5] - 2.0*qz*v[4]],
[qx*v[4] - qy*v[3], qw*v[4] - 2.0*qx*v[5] + qz*v[3], qz*v[4] - 2.0*qy*v[5] - qw*v[3], qx*v[3] + qy*v[4]]])
return 2.0*J

def update(self, q: np.ndarray, gyr: np.ndarray, acc: np.ndarray, mag: np.ndarray) -> np.ndarray:
"""
Expand All @@ -102,23 +131,25 @@ def update(self, q: np.ndarray, gyr: np.ndarray, acc: np.ndarray, mag: np.ndarra
Parameters
----------
q : numpy.ndarray
A-priori quaternion.
A-priori orientation as quaternion.
gyr : numpy.ndarray
Sample of tri-axial Gyroscope in rad/s.
acc : numpy.ndarray
Sample of tri-axial Accelerometer in m/s^2.
mag : numpy.ndarray
Sample of tri-axial Magnetometer in mT.
Sample of tri-axial Magnetometer in uT.
Returns
-------
q : numpy.ndarray
Estimated (A-posteriori) quaternion.
Estimated a-posteriori orientation as quaternion.
"""
g = gyr.copy()
a = acc.copy()
m = mag.copy()
if not np.isclose(np.linalg.norm(q), 1.0):
raise ValueError("A-priori quaternion must have norm equal to 1.")
g = np.copy(gyr)
a = np.copy(acc)
m = np.copy(mag)
# handle NaNs
a_norm = np.linalg.norm(a)
if a_norm == 0:
Expand All @@ -129,30 +160,23 @@ def update(self, q: np.ndarray, gyr: np.ndarray, acc: np.ndarray, mag: np.ndarra
# Normalize vectors
a /= a_norm
m /= m_norm
q /= np.linalg.norm(q)

# ----- Prediction -----
# Approximate apriori quaternion
F = q_mult_L(np.insert(0.5*self.Dt*g, 0, 1.0))
q_apriori = F@q
# Estimate apriori Covariance Matrix
E = np.vstack((-q[1:], skew(q[1:]) + q[0]*np.identity(3)))
Qk = 0.25*self.Dt**2 * (E@self.g_noise@E.T)
P_apriori = F@self.P@F.T + Qk

# Linearize apriori quaternion
F = self.dfdq(g)
q_k = F@q
# Estimate apriori Covariance Matrix (P_k)
E = np.r_[[-q[1:]], skew(q[1:]) + q[0]*np.identity(3)]
Q_k = 0.25*self.Dt**2 * (E@self.g_noise@E.T)
P_k = F@self.P@F.T + Q_k
# ----- Correction -----
q_apriori_conj = q_conj(q_apriori)
z = np.concatenate((q2R(q_apriori_conj)@m, q2R(q_apriori_conj)@a))
H = np.vstack((self.jacobian(q_apriori_conj, m), self.jacobian(q_apriori_conj, a)))
R = np.zeros((6, 6))
R[:3, :3] = self.m_noise
R[3:, 3:] = self.a_noise
K = P_apriori@H.T@np.linalg.inv(H@P_apriori@H.T + R)
q = q_apriori + K@z
P = (np.identity(4) - K@H)@P_apriori

self.q = q/np.linalg.norm(q)
self.P = P

dcm_k = q2R(q_k)
z = np.r_[dcm_k@a, dcm_k@m]
# Linearize Observations
H = self.dhdq(q_k, np.r_[a, m])
v = z - H@q_k
S = H@P_k@H.T + self.R
K = P_k@H.T@np.linalg.inv(S)
self.P = (np.identity(4) - K@H)@P_k
self.q = q_k + K@v
self.q /= np.linalg.norm(self.q)
return self.q

0 comments on commit 0f987bc

Please sign in to comment.