Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ def __init__(self, grid, grid_values, enforce_pdf_nonnegative=True):

def _check_normalization(self, tol=0.01):
"""Warn if any column is not normalized to 1 over the hemisphere."""
# Match HyperhemisphericalGridDistribution.get_manifold_size(), which
# uses the grid embedding dimension for the existing quadrature rule.
# Match HyperhemisphericalGridDistribution.get_manifold_size(): a grid
# in R^d represents S^{d-1}, so the quadrature surface uses d - 1.
hemisphere_surface = 0.5 * (
AbstractHypersphereSubsetDistribution.compute_unit_hypersphere_surface(
self.grid.shape[1]
self.grid.shape[1] - 1
)
)
ints = mean(self.grid_values, axis=0) * hemisphere_surface
Expand Down
23 changes: 23 additions & 0 deletions src/pyrecest/filters/kalman_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,29 @@ def update_linear_robust(
return diagnostics
return None

def update_model_robust(
self,
measurement_model,
measurement,
**kwargs,
):
"""Robustly update with a structural linear Gaussian model object."""
measurement_matrix = _get_required_model_attribute(
measurement_model,
"measurement_matrix",
)
meas_noise = _get_required_model_attribute(
measurement_model,
"meas_noise",
"measurement_noise_cov",
)
return self.update_linear_robust(
measurement,
measurement_matrix,
meas_noise,
**kwargs,
)

def update_model(
self,
measurement_model,
Expand Down
8 changes: 4 additions & 4 deletions src/pyrecest/filters/so3_grid_transition.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ def so3_right_multiplication_grid_transition(
column_maxima = reshape(amax(exponents, axis=0), (1, exponents.shape[1]))
scores = exp(exponents - column_maxima)

manifold_size = (
0.5
* AbstractHypersphereSubsetDistribution.compute_unit_hypersphere_surface(
quaternion_grid.shape[1]
manifold_dim = quaternion_grid.shape[1] - 1
manifold_size = 0.5 * (
AbstractHypersphereSubsetDistribution.compute_unit_hypersphere_surface(
manifold_dim
)
)
column_integrals = sum(scores, axis=0, keepdims=True) / scores.shape[0]
Expand Down
176 changes: 176 additions & 0 deletions tests/filters/test_robust_linear_gaussian_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
import unittest

import numpy as np
import numpy.testing as npt

from pyrecest.backend import __backend_name__, array, eye, to_numpy
from pyrecest.distributions import GaussianDistribution
from pyrecest.filters import KalmanFilter
from pyrecest.filters._linear_gaussian import (
huber_covariance_scale,
linear_gaussian_update,
linear_gaussian_update_robust,
normalized_innovation_squared,
student_t_covariance_scale,
)


@unittest.skipIf(
__backend_name__ in ("pytorch", "jax"),
reason="tests compare backend scalars with NumPy helpers",
)
class RobustLinearGaussianUpdateTest(unittest.TestCase):
def setUp(self):
self.mean = array([0.0, 0.0])
self.covariance = eye(2)
self.measurement_matrix = eye(2)
self.meas_noise = eye(2)

def test_normalized_innovation_squared_matches_mahalanobis_norm(self):
innovation = array([2.0, 0.0])
innovation_covariance = eye(2) * 2.0

nis = normalized_innovation_squared(innovation, innovation_covariance)

self.assertAlmostEqual(float(nis), 2.0)

def test_student_t_scale_is_monotonic(self):
small = student_t_covariance_scale(2.0, measurement_dim=2, dof=4.0)
large = student_t_covariance_scale(100.0, measurement_dim=2, dof=4.0)

self.assertEqual(float(small), 1.0)
self.assertGreater(float(large), float(small))

def test_huber_scale_uses_mahalanobis_threshold(self):
inlier = huber_covariance_scale(4.0, huber_threshold=2.0)
outlier = huber_covariance_scale(100.0, huber_threshold=2.0)

self.assertEqual(float(inlier), 1.0)
self.assertGreater(float(outlier), 1.0)

def test_student_t_update_downweights_outlier(self):
measurement = array([100.0, 100.0])
plain_mean, _ = linear_gaussian_update(
self.mean,
self.covariance,
measurement,
self.measurement_matrix,
self.meas_noise,
)
robust_mean, _, diagnostics = linear_gaussian_update_robust(
self.mean,
self.covariance,
measurement,
self.measurement_matrix,
self.meas_noise,
robust_update="student-t",
student_t_dof=4.0,
return_diagnostics=True,
)

self.assertTrue(diagnostics["accepted"])
self.assertGreater(float(diagnostics["scale"]), 1.0)
self.assertLess(
np.linalg.norm(to_numpy(robust_mean)),
np.linalg.norm(to_numpy(plain_mean)),
)

def test_huber_update_downweights_outlier(self):
measurement = array([100.0, 100.0])
plain_mean, _ = linear_gaussian_update(
self.mean,
self.covariance,
measurement,
self.measurement_matrix,
self.meas_noise,
)
robust_mean, _, diagnostics = linear_gaussian_update_robust(
self.mean,
self.covariance,
measurement,
self.measurement_matrix,
self.meas_noise,
robust_update="huber",
huber_threshold=2.0,
return_diagnostics=True,
)

self.assertTrue(diagnostics["accepted"])
self.assertGreater(float(diagnostics["scale"]), 1.0)
self.assertLess(
np.linalg.norm(to_numpy(robust_mean)),
np.linalg.norm(to_numpy(plain_mean)),
)

def test_none_method_matches_standard_update(self):
measurement = array([1.0, -1.0])
plain_mean, plain_covariance = linear_gaussian_update(
self.mean,
self.covariance,
measurement,
self.measurement_matrix,
self.meas_noise,
)
robust_mean, robust_covariance = linear_gaussian_update_robust(
self.mean,
self.covariance,
measurement,
self.measurement_matrix,
self.meas_noise,
robust_update="none",
)

npt.assert_allclose(to_numpy(robust_mean), to_numpy(plain_mean))
npt.assert_allclose(to_numpy(robust_covariance), to_numpy(plain_covariance))

def test_gate_rejects_without_robust_update(self):
measurement = array([100.0, 100.0])
robust_mean, robust_covariance, diagnostics = linear_gaussian_update_robust(
self.mean,
self.covariance,
measurement,
self.measurement_matrix,
self.meas_noise,
robust_update="none",
gate_threshold=1.0,
return_diagnostics=True,
)

self.assertFalse(diagnostics["accepted"])
npt.assert_allclose(to_numpy(robust_mean), to_numpy(self.mean))
npt.assert_allclose(to_numpy(robust_covariance), to_numpy(self.covariance))

def test_kalman_filter_update_linear_robust_returns_diagnostics(self):
kf = KalmanFilter(GaussianDistribution(self.mean, self.covariance))
diagnostics = kf.update_linear_robust(
array([100.0, 100.0]),
self.measurement_matrix,
self.meas_noise,
robust_update="student-t",
return_diagnostics=True,
)

self.assertIn("nis", diagnostics)
self.assertTrue(diagnostics["accepted"])
self.assertGreater(float(diagnostics["scale"]), 1.0)

def test_update_model_robust_uses_structural_measurement_model(self):
class MeasurementModel:
measurement_matrix = self.measurement_matrix
meas_noise = self.meas_noise

kf = KalmanFilter(GaussianDistribution(self.mean, self.covariance))
diagnostics = kf.update_model_robust(
MeasurementModel(),
array([100.0, 100.0]),
robust_update="huber",
return_diagnostics=True,
)

self.assertIn("nis", diagnostics)
self.assertTrue(diagnostics["accepted"])
self.assertGreater(float(diagnostics["scale"]), 1.0)


if __name__ == "__main__":
unittest.main()
Loading