Skip to content

Commit

Permalink
Adding the IPLF components and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
A-acuto committed May 15, 2024
1 parent 6791a6d commit b5cf459
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 8 deletions.
29 changes: 29 additions & 0 deletions stonesoup/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,3 +796,32 @@ def sde_euler_maruyama_integration(fun, t_values, state_x0):
a, b = fun(state_x, t)
state_x.state_vector = state_x.state_vector + a*delta_t + b@delta_w
return state_x.state_vector


def slr_definition(state_pdf, prediction, force_symmetry=False):
""" Statistical linear regression (SLR), adapts the definition (9)-(11) as found in
Á. F. García-Fernández, L. Svensson and S. Särkkä, "Iterated Posterior Linearization Smoother,"
in IEEE Transactions on Automatic Control, vol. 62, no. 4, pp. 2056-2063, April 2017, doi: 10.1109/TAC.2016.2592681.
"""

# First two moments of the state pdf
x_bar = state_pdf.state_vector
p_matrix = state_pdf.covar

# The predicted quantities wrt the state pdf (e.g. using sigma points)
z_bar = prediction.state_vector.astype(float)
# next quantity (psi) is naturally available in GaussianMeasurementPrediction for predicted measurements,
# but has been introduced to AugmentedGaussianStatePrediction for state predictions
psi = prediction.cross_covar
phi = prediction.covar

# Statistical linear regression parameters of a function predicted with the quantities above
H_plus = psi.T @ np.linalg.pinv(p_matrix)
b_plus = z_bar - H_plus @ x_bar
Omega_plus = phi - H_plus @ p_matrix @ H_plus.T

if force_symmetry:
Omega_plus = (Omega_plus + Omega_plus.T) / 2

# The output is the function's SLR with respect to the state_pdf
return H_plus, b_plus, Omega_plus
36 changes: 34 additions & 2 deletions stonesoup/functions/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
from numpy import deg2rad
from scipy.linalg import cholesky, LinAlgError
from pytest import approx, raises
import datetime

from .. import (
cholesky_eps, jacobian, gm_reduce_single, mod_bearing, mod_elevation, gauss2sigma,
rotx, roty, rotz, cart2sphere, cart2angles, pol2cart, sphere2cart, dotproduct, gm_sample)
from ...types.array import StateVector, StateVectors, Matrix
rotx, roty, rotz, cart2sphere, cart2angles, pol2cart, sphere2cart, dotproduct, gm_sample,
slr_definition)
from ...types.array import StateVector, StateVectors, Matrix, CovarianceMatrix
from ...types.state import State, GaussianState
from ...types.prediction import GaussianMeasurementPrediction
from ...types.update import GaussianStateUpdate


def test_cholesky_eps():
Expand Down Expand Up @@ -344,3 +348,31 @@ def test_gm_sample(means, covars, weights, size):
assert samples.shape[0] == means[0].shape[0]
else:
assert samples.shape[0] == means.shape[0]


def test_slr_definition():
time1 = datetime.datetime.now()

posterior_state = GaussianStateUpdate(
state_vector = np.array([[7.77342961], [1.]]),
covar = CovarianceMatrix([[4.54274216e+00, -8.47168858e-16],
[-8.47168858e-16, 2.22044604e-16]]),
hypothesis = None,
timestamp=time1)

prediction_state = GaussianMeasurementPrediction(
state_vector=[[28.7828]],
covar=[[812.403]],
timestamp = time1,
cross_covar=CovarianceMatrix([[ 4.94297048e+01],
[-9.10320262e-15]]))
print(prediction_state)
h_matrix, b_vector, omega_cov_matrix = slr_definition(posterior_state,
prediction_state,
force_symmetry=True)
assert np.allclose(h_matrix,
np.array([[10.8810, -2.029e-15]]))
assert np.allclose(b_vector,
np.array([[-55.8001]]))
assert np.allclose(omega_cov_matrix,
np.array([[274.557]]))
77 changes: 76 additions & 1 deletion stonesoup/models/measurement/linear.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import numpy as np

from ...base import Property
from ...types.array import CovarianceMatrix
from ...types.array import CovarianceMatrix, Matrix
from ..base import LinearModel, GaussianModel
from .base import MeasurementModel
from ...types.state import StateVector


# TODO: Probably should call this LinearGaussianMeasurementModel
Expand Down Expand Up @@ -95,3 +96,77 @@ def covar(self, **kwargs):
"""

return self.noise_covar


class GeneralLinearGaussian(MeasurementModel, LinearModel, GaussianModel):
""" Generalised Linear Gaussian model that is used for the Iterated Posterior
Likelihood filter implementatation.
"""


meas_matrix: Matrix = Property(doc="Measurement matrix")
bias_value: StateVector = Property(doc="Bias value")
noise_covar: CovarianceMatrix = Property(doc="Noise covariance")

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
if not isinstance(self.meas_matrix, Matrix):
self.meas_matrix = Matrix(self.meas_matrix)
if not isinstance(self.bias_value, StateVector):
self.bias_value = StateVector(self.bias_value)
if not isinstance(self.noise_covar, CovarianceMatrix):
self.noise_covar = CovarianceMatrix(self.noise_covar)

@property
def ndim_meas(self):
"""ndim_meas getter method
Returns
-------
:class:`int`
The number of measurement dimensions
"""

return len(self.mapping)

def matrix(self, **kwargs): return self.meas_matrix

def bias(self, **kwargs): return self.bias_value

def covar(self, **kwargs):
"""Returns the measurement model noise covariance matrix.
Returns
-------
:class:`~.CovarianceMatrix` of shape\
(:py:attr:`~ndim_meas`, :py:attr:`~ndim_meas`)
The measurement noise covariance.
"""

return self.noise_covar

def function(self, state, noise=False, **kwargs):
"""Model function :math:`h(t,x(t),w(t))`
Parameters
----------
state: :class:`~.State`
An input state
noise: :class:`numpy.ndarray` or bool
An externally generated random process noise sample (the default is
`False`, in which case no noise will be added
if 'True', the output of :meth:`~.Model.rvs` is added)
Returns
-------
:class:`numpy.ndarray` of shape (:py:attr:`~ndim_meas`, 1)
The model function evaluated given the provided time interval.
"""

if isinstance(noise, bool) or noise is None:
if noise:
noise = self.rvs(num_samples=state.state_vector.shape[1], **kwargs)
else:
noise = 0

return self.matrix(**kwargs)@state.state_vector + self.bias_value + noise
103 changes: 102 additions & 1 deletion stonesoup/models/measurement/tests/test_lg.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import numpy as np
from scipy.stats import multivariate_normal

from ..linear import LinearGaussian
from ..linear import LinearGaussian, GeneralLinearGaussian
from ....types.array import CovarianceMatrix, Matrix
from ....types.state import State


Expand Down Expand Up @@ -106,3 +107,103 @@ def test_lgmodel(H, R, ndim_state, mapping):
# Check first values produced by seed match
for _ in range(3):
assert all(lg1.rvs() == lg2.rvs())


@pytest.mark.parametrize(
"H, R, B, ndim_state, mapping",
[
( # 1D meas, 2D state
np.array([[1, 0]]),
np.array([[0.1]]),
np.array([[1.0]]),
2,
[0],
),
( # 2D meas, 4D state
np.array([[1, 0, 0, 0], [0, 0, 1, 0]]),
np.diag([0.1, 0.1]),
np.array([[1.0]]),
4,
[0, 2],
),
( # 4D meas, 2D state
np.array([[1, 0], [0, 0], [0, 1], [0, 0]]),
np.diag([0.1, 0.1, 0.1, 0.1]),
np.array([[1.0]]),
2,
[0, None, 1, None],
),
],
ids=["1D_meas:2D_state", "2D_meas:4D_state", "4D_meas:2D_state"]
)
def test_glmodel(H, R, B, ndim_state, mapping):
"""
test for the generalised linear model
"""

state_vec = np.array([[n] for n in range(ndim_state)])
state = State(state_vec)

glm = GeneralLinearGaussian(ndim_state=ndim_state,
mapping=mapping,
meas_matrix=H,
bias_value=B,
noise_covar=R)

# Ensure ```lg.transfer_function()``` returns H
assert np.array_equal(H, glm.matrix())

# Ensure lg.jacobian() returns H
assert np.array_equal(H, glm.jacobian(state=state))

# Ensure ```lg.covar()``` returns R
assert np.array_equal(R, glm.covar())

# Ensure the Bias is returned
assert np.array_equal(B, glm.bias())

# Project a state through the model
# (without noise)
meas_pred_wo_noise = glm.function(state)
assert np.array_equal(meas_pred_wo_noise, H @ state_vec + B)

# Evaluate the likelihood of the predicted measurement, given the state
# (without noise)
prob = glm.pdf(State(meas_pred_wo_noise), state)
assert approx(prob) == multivariate_normal.pdf(
meas_pred_wo_noise.T,
mean=np.array(H @ state_vec + B).ravel(),
cov=R)

# Propagate a state vector through the model
# (with external noise)
noise = glm.rvs()
meas_pred_w_enoise = glm.function(state,
noise=noise)
assert np.array_equal(meas_pred_w_enoise, H @ state_vec + B + noise)

# Evaluate the likelihood of the predicted state, given the prior
# (with noise)
prob = glm.pdf(State(meas_pred_w_enoise), state)
assert approx(prob) == multivariate_normal.pdf(
meas_pred_w_enoise.T,
mean=np.array(H @ state_vec + B).ravel(),
cov=R)

# Test random seed give consistent results
glm1 = GeneralLinearGaussian(ndim_state=ndim_state,
mapping=mapping,
meas_matrix=H,
bias_value=B,
noise_covar=R,
seed=1)
glm2 =GeneralLinearGaussian(ndim_state=ndim_state,
mapping=mapping,
meas_matrix=H,
bias_value=B,
noise_covar=R,
seed=1)

# Check first values produced by seed match
for _ in range(3):
assert all(glm1.rvs() == glm2.rvs())
84 changes: 83 additions & 1 deletion stonesoup/updater/iterated.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
from ..measures import Measure, KLDivergence
from ..models.measurement import MeasurementModel
from ..models.transition import TransitionModel
from ..models.measurement.linear import GeneralLinearGaussian
from ..predictor import Predictor
from .kalman import ExtendedKalmanUpdater
from .kalman import ExtendedKalmanUpdater, UnscentedKalmanUpdater, KalmanUpdater
from ..predictor.kalman import ExtendedKalmanPredictor
from ..smoother import Smoother
from ..smoother.kalman import ExtendedKalmanSmoother
from ..types.prediction import Prediction
from ..types.track import Track
from ..functions import slr_definition


class DynamicallyIteratedUpdater(Updater):
Expand Down Expand Up @@ -137,3 +139,83 @@ def __init__(self, *args, **kwargs):


ExtendedKalmanUpdater.register(DynamicallyIteratedEKFUpdater)


class IPLFKalmanUpdater(UnscentedKalmanUpdater):
"""
The update step of the IPLF algorithm.
"""

tolerance: float = Property(
default=1e-1,
doc="The value of the difference in the measure used as a stopping criterion.")
measure: Measure = Property(
default=KLDivergence(),
doc="The measure to use to test the iteration stopping criterion. Defaults to the "
"GaussianKullbackLeiblerDivergence between current and prior posterior state estimate.")
max_iterations: int = Property(
default=5,
doc="Number of iterations before while loop is exited and a non-convergence warning is "
"returned")

def update(self, hypothesis, force_symmetry=True, **kwargs):
r"""The IPLF update method. """

# Record the starting point (not a posterior here, rather a variable that stores an entry for KLD computation)
prev_post_state = hypothesis.prediction # Prior is only on the first step, later updated

# Get the measurement model out of the measurement if it's there.
# If not, use the one native to the updater (which might still be none).
measurement_model = self._check_measurement_model(hypothesis.measurement.measurement_model)

# The first iteration is just the application of the UKF update.
updater = UnscentedKalmanUpdater(beta=self.beta, kappa=self.kappa, force_symmetric_covariance=True)
hypothesis.measurement_prediction = self.predict_measurement(
predicted_state=hypothesis.prediction,
measurement_model=measurement_model
) # UKF measurement prediction that relies on Unscented Transform and is required in the update
post_state = updater.update(hypothesis, **kwargs)

# Now update the measurement prediction mean and loop
iterations = 1
while self.measure(prev_post_state, post_state) > self.tolerance:

if iterations >= self.max_iterations:
# warnings.warn("IPLF update reached maximum number of iterations.")
break

# SLR is wrt to tne approximated posterior in post_state, not the original prior in hypothesis.prediction
measurement_prediction = UnscentedKalmanUpdater(beta=self.beta, kappa=self.kappa).predict_measurement(
predicted_state=post_state,
measurement_model=measurement_model
)

h_matrix, b_vector, omega_cov_matrix = slr_definition(post_state,
measurement_prediction,
force_symmetry=force_symmetry)

measurement_model_linearized = GeneralLinearGaussian(
ndim_state=measurement_model.ndim_state,
mapping=measurement_model.mapping,
meas_matrix=h_matrix,
bias_value=b_vector,
noise_covar=omega_cov_matrix)

hypothesis.measurement_prediction = KalmanUpdater().predict_measurement(
predicted_state=hypothesis.prediction,
measurement_model=measurement_model_linearized)
hypothesis.measurement.measurement_model = measurement_model_linearized

prev_post_state = post_state
# update is computed using the original prior in hypothesis.prediction
post_state = KalmanUpdater().update(hypothesis, **kwargs)
post_state.hypothesis.measurement.measurement_model = measurement_model

if force_symmetry:
post_state.covar = (post_state.covar + post_state.covar.T) / 2

# increment counter
iterations += 1


return post_state
Loading

0 comments on commit b5cf459

Please sign in to comment.