diff --git a/src/pyrecest/filters/partitioned_so3_product_particle_filter.py b/src/pyrecest/filters/partitioned_so3_product_particle_filter.py index 54d123508..2f4828562 100644 --- a/src/pyrecest/filters/partitioned_so3_product_particle_filter.py +++ b/src/pyrecest/filters/partitioned_so3_product_particle_filter.py @@ -3,9 +3,8 @@ from collections.abc import Callable, Sequence # pylint: disable=no-name-in-module,no-member,too-many-positional-arguments -from pyrecest.backend import all, array, exp, ndim, ones, random, stack, sum, to_numpy +from pyrecest.backend import all, array, log, ndim, ones, random, stack, sum, to_numpy from pyrecest.distributions import SO3DiracDistribution -from pyrecest.distributions._so3_helpers import geodesic_distance from .so3_product_particle_filter import SO3ProductParticleFilter @@ -290,10 +289,44 @@ def update_with_block_likelihoods( if not all(likelihood_values >= 0.0): raise ValueError("likelihood values must be nonnegative.") + return self.update_with_block_log_likelihoods( + log(likelihood_values), + resample=resample, + ess_threshold=ess_threshold, + ) + + def update_with_block_log_likelihoods( + self, + log_likelihood: Callable | Sequence, + measurement=None, + resample: bool = True, + ess_threshold=None, + ): + """Update block weights from block log-likelihoods. + + The log-likelihood must evaluate to an array shaped + ``(n_blocks, n_particles)``. Each row updates the corresponding block's + weights independently using log-sum-exp normalization. + """ + if callable(log_likelihood): + if measurement is None: + log_likelihood_values = log_likelihood(self.particles) + else: + log_likelihood_values = log_likelihood(measurement, self.particles) + else: + log_likelihood_values = log_likelihood + log_likelihood_values = array(log_likelihood_values, dtype=float) + if log_likelihood_values.shape != (len(self.partition), self.n_particles): + raise ValueError( + "block log-likelihoods must have shape " + f"({len(self.partition)}, {self.n_particles})." + ) + self._block_weights = stack( [ - self._normalize_weights( - self._block_weights[block_idx] * likelihood_values[block_idx] + self._normalize_log_weights( + log(self._block_weights[block_idx]) + + log_likelihood_values[block_idx] ) for block_idx in range(len(self.partition)) ], @@ -330,16 +363,73 @@ def update_with_component_likelihoods( if not all(component_likelihoods >= 0.0): raise ValueError("likelihood values must be nonnegative.") - block_likelihoods = [] + return self.update_with_component_log_likelihoods( + log(component_likelihoods), + resample=resample, + ess_threshold=ess_threshold, + ) + + def update_with_component_log_likelihoods( + self, + component_log_likelihoods, + *, + resample: bool = True, + ess_threshold=None, + ): + """Update from per-component log-likelihoods shaped ``(n_particles, K)``.""" + component_log_likelihoods = array(component_log_likelihoods, dtype=float) + if component_log_likelihoods.shape != (self.n_particles, self.num_rotations): + raise ValueError( + "component_log_likelihoods must have shape " + f"({self.n_particles}, {self.num_rotations})." + ) + + block_log_likelihoods = [] for block in self.partition: - block_likelihood = ones(self.n_particles) - for component_idx in block: - block_likelihood = ( - block_likelihood * component_likelihoods[:, component_idx] + block_log_likelihoods.append( + sum( + stack( + [ + component_log_likelihoods[:, component_idx] + for component_idx in block + ], + axis=1, + ), + axis=1, ) - block_likelihoods.append(block_likelihood) - return self.update_with_block_likelihoods( - stack(block_likelihoods, axis=0), + ) + return self.update_with_block_log_likelihoods( + stack(block_log_likelihoods, axis=0), + resample=resample, + ess_threshold=ess_threshold, + ) + + def update_with_geodesic_log_likelihood( + self, + measurement, + noise_std=None, + *, + component_noise_std=None, + mask=None, + confidence=None, + max_noise_std=None, + confidence_exponent: float = 1.0, + outlier_prob: float = 0.0, + resample: bool = True, + ess_threshold=None, + ): + """Update partition weights with masked component geodesic log-likelihoods.""" + return self.update_with_component_log_likelihoods( + self.component_geodesic_log_likelihood( + measurement, + noise_std, + component_noise_std=component_noise_std, + mask=mask, + confidence=confidence, + max_noise_std=max_noise_std, + confidence_exponent=confidence_exponent, + outlier_prob=outlier_prob, + ), resample=resample, ess_threshold=ess_threshold, ) @@ -349,39 +439,29 @@ def update_with_geodesic_likelihood( measurement, noise_std, *, + component_noise_std=None, mask=None, + confidence=None, + max_noise_std=None, + confidence_exponent: float = 1.0, + outlier_prob: float = 0.0, resample: bool = True, ess_threshold=None, ): - """Update with isotropic masked geodesic likelihoods per partition block.""" - if noise_std <= 0.0: - raise ValueError("noise_std must be positive.") - - measurement = self._as_product_point(measurement, self.num_rotations) - if mask is None: - mask = ones(self.num_rotations) - else: - mask = array(mask, dtype=float) - if mask.shape != (self.num_rotations,): - raise ValueError("mask must have shape (num_rotations,).") + """Update with masked geodesic likelihoods per partition block. - distances = stack( - [ - geodesic_distance(self.particles[:, i, :], measurement[i, :]) - for i in range(self.num_rotations) - ], - axis=1, - ) - block_likelihoods = [] - for block in self.partition: - quadratic_terms = stack( - [mask[i] * distances[:, i] ** 2 for i in block], - axis=1, - ) - quadratic = sum(quadratic_terms, axis=1) / (noise_std**2) - block_likelihoods.append(exp(-0.5 * quadratic)) - return self.update_with_block_likelihoods( - stack(block_likelihoods, axis=0), + This preserves the existing likelihood-space API while delegating to the + log-likelihood implementation for numerical stability. + """ + return self.update_with_geodesic_log_likelihood( + measurement, + noise_std, + component_noise_std=component_noise_std, + mask=mask, + confidence=confidence, + max_noise_std=max_noise_std, + confidence_exponent=confidence_exponent, + outlier_prob=outlier_prob, resample=resample, ess_threshold=ess_threshold, ) diff --git a/src/pyrecest/filters/so3_product_particle_filter.py b/src/pyrecest/filters/so3_product_particle_filter.py index 8e4f2482b..e38808503 100644 --- a/src/pyrecest/filters/so3_product_particle_filter.py +++ b/src/pyrecest/filters/so3_product_particle_filter.py @@ -5,16 +5,24 @@ # pylint: disable=no-name-in-module,no-member from pyrecest.backend import ( all, + any, array, diag, exp, + isfinite, + isnan, + log, + max, + maximum, ndim, ones, random, reshape, + sqrt, stack, sum, to_numpy, + where, zeros, ) from pyrecest.distributions import SO3DiracDistribution @@ -129,6 +137,122 @@ def _normalize_weights(weights): raise ValueError("At least one particle weight must be positive.") return weights / weight_sum + @staticmethod + def _normalize_log_weights(log_weights): + """Normalize possibly unscaled log weights with log-sum-exp stability.""" + log_weights = array(log_weights, dtype=float) + if ndim(log_weights) != 1: + log_weights = reshape(log_weights, (-1,)) + if any(isnan(log_weights)): + raise ValueError("log weights must not contain NaN.") + max_log_weight = max(log_weights) + if not isfinite(max_log_weight): + raise ValueError( + "At least one log weight must be finite and no log weight may be +inf." + ) + return SO3ProductParticleFilter._normalize_weights( + exp(log_weights - max_log_weight) + ) + + @staticmethod + def _validate_probability(name: str, value: float) -> float: + value = float(value) + if not isfinite(value) or value < 0.0 or value > 1.0: + raise ValueError(f"{name} must be a finite probability in [0, 1].") + return value + + @staticmethod + def _as_component_mask(mask, num_rotations): + if mask is None: + return ones(num_rotations) + mask = array(mask, dtype=float) + if mask.shape != (num_rotations,): + raise ValueError("mask must have shape (num_rotations,).") + if not all(isfinite(mask)): + raise ValueError("mask values must be finite.") + if not all(mask >= 0.0): + raise ValueError("mask values must be nonnegative.") + return mask + + @staticmethod + def _as_component_confidence(confidence, num_rotations): + if confidence is None: + return ones(num_rotations) + confidence = array(confidence, dtype=float) + if confidence.shape != (num_rotations,): + raise ValueError("confidence must have shape (num_rotations,).") + if not all(isfinite(confidence)): + raise ValueError("confidence values must be finite.") + if not all(confidence >= 0.0) or not all(confidence <= 1.0): + raise ValueError("confidence values must be in [0, 1].") + return confidence + + @staticmethod + def _as_component_noise_std(noise_std, component_noise_std, num_rotations): + if component_noise_std is None: + if noise_std is None: + raise ValueError("noise_std or component_noise_std must be provided.") + sigma = array(noise_std, dtype=float) + else: + sigma = array(component_noise_std, dtype=float) + + if ndim(sigma) == 0: + if not isfinite(sigma) or sigma <= 0.0: + raise ValueError("noise standard deviations must be positive and finite.") + return ones(num_rotations) * sigma + if sigma.shape != (num_rotations,): + raise ValueError("component_noise_std must have shape (num_rotations,).") + if not all(isfinite(sigma)): + raise ValueError("noise standard deviations must be finite.") + if not all(sigma > 0.0): + raise ValueError("noise standard deviations must be positive.") + return sigma + + @staticmethod + def confidence_to_noise_std( + confidence, + noise_std, + max_noise_std, + *, + confidence_exponent: float = 1.0, + mask=None, + ): + """Map detector confidence values in ``[0, 1]`` to SO(3) noise scales. + + The mapping is + ``sigma(c)^2 = noise_std^2 + (1 - c)^confidence_exponent *`` + ``(max_noise_std^2 - noise_std^2)``. + """ + confidence = array(confidence, dtype=float) + if ndim(confidence) != 1: + raise ValueError("confidence must be a one-dimensional array.") + num_rotations = confidence.shape[0] + confidence = SO3ProductParticleFilter._as_component_confidence( + confidence, num_rotations + ) + if noise_std is None or max_noise_std is None: + raise ValueError("noise_std and max_noise_std must be provided.") + min_sigma = float(noise_std) + max_sigma = float(max_noise_std) + exponent = float(confidence_exponent) + if min_sigma <= 0.0 or not isfinite(min_sigma): + raise ValueError("noise_std must be positive and finite.") + if max_sigma <= 0.0 or not isfinite(max_sigma): + raise ValueError("max_noise_std must be positive and finite.") + if max_sigma < min_sigma: + raise ValueError("max_noise_std must be greater than or equal to noise_std.") + if exponent <= 0.0 or not isfinite(exponent): + raise ValueError("confidence_exponent must be positive and finite.") + + variance = min_sigma * min_sigma + (1.0 - confidence) ** exponent * ( + max_sigma * max_sigma - min_sigma * min_sigma + ) + sigma = sqrt(maximum(variance, 1e-16)) + if mask is not None: + mask = SO3ProductParticleFilter._as_component_mask(mask, num_rotations) + sigma = where(mask > 0.0, sigma, max_sigma) + return sigma + @staticmethod def _as_tangent_array(tangent_vectors, n_particles, num_rotations): tangent_vectors = array(tangent_vectors, dtype=float) @@ -344,33 +468,88 @@ def update_with_likelihood( if not all(likelihood_values >= 0.0): raise ValueError("likelihood values must be nonnegative.") - self.filter_state.w = self._normalize_weights(self.weights * likelihood_values) + return self.update_with_log_likelihood( + log(likelihood_values), + resample=resample, + ess_threshold=ess_threshold, + ) + + def update_with_log_likelihood( + self, + log_likelihood: Callable, + measurement=None, + resample: bool = True, + ess_threshold=None, + ): + """Update weights from log-likelihoods evaluated on product particles.""" + if callable(log_likelihood): + if measurement is None: + log_likelihood_values = log_likelihood(self.particles) + else: + log_likelihood_values = log_likelihood(measurement, self.particles) + else: + log_likelihood_values = log_likelihood + log_likelihood_values = array(log_likelihood_values, dtype=float) + if log_likelihood_values.shape != self.weights.shape: + raise ValueError("log_likelihood must return one value per particle.") + + self.filter_state.w = self._normalize_log_weights( + log(self.weights) + log_likelihood_values + ) ess = self.effective_sample_size() threshold = self.n_particles / 2.0 if ess_threshold is None else ess_threshold if resample and ess < threshold: self.resample_systematic() return ess - def update_with_geodesic_likelihood( + def component_geodesic_log_likelihood( self, measurement, - noise_std, + noise_std=None, *, + component_noise_std=None, mask=None, - resample: bool = True, - ess_threshold=None, + confidence=None, + max_noise_std=None, + confidence_exponent: float = 1.0, + outlier_prob: float = 0.0, ): - """Update with an isotropic masked geodesic likelihood on SO(3)^K.""" - if noise_std <= 0.0: - raise ValueError("noise_std must be positive.") + """Return per-component masked geodesic log-likelihoods. + + The returned array has shape ``(n_particles, num_rotations)``. Masks + deactivate components, confidence values in ``[0, 1]`` scale each + component's log-likelihood contribution, ``component_noise_std`` supplies + heteroskedastic per-component noise, and ``outlier_prob`` adds an + unnormalized constant likelihood floor for robustness. + """ + if max_noise_std is not None and component_noise_std is not None: + raise ValueError( + "component_noise_std and max_noise_std are mutually exclusive." + ) measurement = self._as_product_point(measurement, self.num_rotations) - if mask is None: - mask = ones(self.num_rotations) - else: - mask = array(mask, dtype=float) - if mask.shape != (self.num_rotations,): - raise ValueError("mask must have shape (num_rotations,).") + mask = self._as_component_mask(mask, self.num_rotations) + confidence_weights = self._as_component_confidence( + confidence, self.num_rotations + ) + component_weights = mask * confidence_weights + + if max_noise_std is not None: + if confidence is None: + raise ValueError( + "confidence must be provided when max_noise_std is used." + ) + component_noise_std = self.confidence_to_noise_std( + confidence, + noise_std, + max_noise_std, + confidence_exponent=confidence_exponent, + mask=mask, + ) + sigma = self._as_component_noise_std( + noise_std, component_noise_std, self.num_rotations + ) + eps = self._validate_probability("outlier_prob", outlier_prob) distances = stack( [ @@ -379,10 +558,103 @@ def update_with_geodesic_likelihood( ], axis=1, ) - quadratic = sum(mask * distances**2, axis=1) / (noise_std**2) - likelihood_values = exp(-0.5 * quadratic) - return self.update_with_likelihood( - lambda _: likelihood_values, + clean_log_likelihood = -0.5 * (distances / sigma) ** 2 + if eps > 0.0: + clean_probability = eps if eps < 1.0 - 1e-12 else 1.0 - 1e-12 + outlier_probability = eps if eps > 1e-12 else 1e-12 + clean_term = log(1.0 - clean_probability) + clean_log_likelihood + outlier_term = log(outlier_probability) + normalizer = maximum(clean_term, outlier_term) + clean_log_likelihood = normalizer + log( + exp(clean_term - normalizer) + exp(outlier_term - normalizer) + ) + return component_weights * clean_log_likelihood + + def geodesic_log_likelihood( + self, + measurement, + noise_std=None, + *, + component_noise_std=None, + mask=None, + confidence=None, + max_noise_std=None, + confidence_exponent: float = 1.0, + outlier_prob: float = 0.0, + ): + """Return one masked geodesic log-likelihood per product particle.""" + return sum( + self.component_geodesic_log_likelihood( + measurement, + noise_std, + component_noise_std=component_noise_std, + mask=mask, + confidence=confidence, + max_noise_std=max_noise_std, + confidence_exponent=confidence_exponent, + outlier_prob=outlier_prob, + ), + axis=1, + ) + + def update_with_geodesic_log_likelihood( + self, + measurement, + noise_std=None, + *, + component_noise_std=None, + mask=None, + confidence=None, + max_noise_std=None, + confidence_exponent: float = 1.0, + outlier_prob: float = 0.0, + resample: bool = True, + ess_threshold=None, + ): + """Update with a masked, confidence-aware geodesic log-likelihood.""" + return self.update_with_log_likelihood( + self.geodesic_log_likelihood( + measurement, + noise_std, + component_noise_std=component_noise_std, + mask=mask, + confidence=confidence, + max_noise_std=max_noise_std, + confidence_exponent=confidence_exponent, + outlier_prob=outlier_prob, + ), + resample=resample, + ess_threshold=ess_threshold, + ) + + def update_with_geodesic_likelihood( + self, + measurement, + noise_std, + *, + component_noise_std=None, + mask=None, + confidence=None, + max_noise_std=None, + confidence_exponent: float = 1.0, + outlier_prob: float = 0.0, + resample: bool = True, + ess_threshold=None, + ): + """Update with a masked geodesic likelihood on SO(3)^K. + + This method preserves the existing likelihood-space API and delegates to + the log-likelihood implementation for numerical stability. + """ + return self.update_with_geodesic_log_likelihood( + measurement, + noise_std, + component_noise_std=component_noise_std, + mask=mask, + confidence=confidence, + max_noise_std=max_noise_std, + confidence_exponent=confidence_exponent, + outlier_prob=outlier_prob, resample=resample, ess_threshold=ess_threshold, ) diff --git a/tests/filters/test_partitioned_so3_product_particle_filter.py b/tests/filters/test_partitioned_so3_product_particle_filter.py index 146373d6c..40a33dc38 100644 --- a/tests/filters/test_partitioned_so3_product_particle_filter.py +++ b/tests/filters/test_partitioned_so3_product_particle_filter.py @@ -120,6 +120,53 @@ def test_component_likelihood_update(self): with self.assertRaises(ValueError): filt.update_with_component_likelihoods(array([[1.0, -1.0], [1.0, 1.0]])) + def test_block_log_likelihood_update(self): + filt = PartitionedSO3ProductParticleFilter( + n_particles=2, + num_rotations=2, + partition="singleton", + ) + + filt.update_with_block_log_likelihoods( + array([[0.0, -float("inf")], [-float("inf"), 0.0]]), + resample=False, + ) + + npt.assert_allclose(filt.block_weights, array([[1.0, 0.0], [0.0, 1.0]])) + + def test_geodesic_log_update_uses_confidence_per_component(self): + filt = PartitionedSO3ProductParticleFilter( + n_particles=2, + num_rotations=2, + partition="singleton", + ) + filt.set_particles( + stack( + [ + array([[0.0, 0.0, 0.0, 1.0], z_quaternion(0.0)]), + array([z_quaternion(pi / 2.0), z_quaternion(pi / 2.0)]), + ], + axis=0, + ) + ) + + filt.update_with_geodesic_log_likelihood( + array([[0.0, 0.0, 0.0, 1.0], z_quaternion(pi / 2.0)]), + noise_std=0.2, + confidence=array([1.0, 0.0]), + resample=False, + ) + + self.assertGreater(float(filt.block_weights[0, 0]), 0.99) + npt.assert_allclose(filt.block_weights[1], array([0.5, 0.5]), atol=ATOL) + + filt.update_with_geodesic_log_likelihood( + array([[0.0, 0.0, 0.0, 1.0], z_quaternion(pi / 2.0)]), + component_noise_std=array([10.0, 0.2]), + resample=False, + ) + self.assertGreater(float(filt.block_weights[1, 1]), 0.99) + if __name__ == "__main__": unittest.main() diff --git a/tests/filters/test_so3_product_particle_filter.py b/tests/filters/test_so3_product_particle_filter.py index ff9796acc..740804d27 100644 --- a/tests/filters/test_so3_product_particle_filter.py +++ b/tests/filters/test_so3_product_particle_filter.py @@ -105,6 +105,62 @@ def test_update_mask_ignores_unobserved_rotations(self): npt.assert_allclose(filt.weights, array([0.5, 0.5]), atol=ATOL) + def test_log_likelihood_update_supports_zero_likelihoods(self): + filt = SO3ProductParticleFilter(n_particles=3, num_rotations=1) + + ess = filt.update_with_log_likelihood( + array([0.0, -1000.0, -float("inf")]), + resample=False, + ) + + self.assertLess(float(ess), 2.0) + self.assertGreater(float(filt.weights[0]), 0.999) + npt.assert_allclose(float(filt.weights[2]), 0.0, atol=ATOL) + + def test_confidence_to_noise_std_maps_confidence_to_scale(self): + sigma = SO3ProductParticleFilter.confidence_to_noise_std( + array([1.0, 0.5, 0.0]), + noise_std=0.1, + max_noise_std=1.0, + ) + + npt.assert_allclose(float(sigma[0]), 0.1, atol=ATOL) + npt.assert_allclose(float(sigma[2]), 1.0, atol=ATOL) + self.assertGreater(float(sigma[1]), 0.1) + self.assertLess(float(sigma[1]), 1.0) + + def test_geodesic_log_likelihood_uses_confidence_and_component_noise(self): + measurement = array([[0.0, 0.0, 0.0, 1.0], z_quaternion(pi / 2.0)]) + particles = stack( + [ + array([[0.0, 0.0, 0.0, 1.0], z_quaternion(0.0)]), + array([z_quaternion(pi / 2.0), z_quaternion(pi / 2.0)]), + ], + axis=0, + ) + + confidence_filter = SO3ProductParticleFilter(n_particles=2, num_rotations=2) + confidence_filter.set_particles(particles) + confidence_filter.update_with_geodesic_log_likelihood( + measurement, + noise_std=0.2, + confidence=array([1.0, 0.0]), + resample=False, + ) + self.assertGreater(float(confidence_filter.weights[0]), 0.99) + + heteroskedastic_filter = SO3ProductParticleFilter(n_particles=2, num_rotations=2) + heteroskedastic_filter.set_particles(particles) + heteroskedastic_filter.update_with_geodesic_log_likelihood( + measurement, + component_noise_std=array([10.0, 0.2]), + resample=False, + ) + self.assertGreater( + float(heteroskedastic_filter.weights[1]), + float(heteroskedastic_filter.weights[0]), + ) + def test_systematic_resampling_resets_weights(self): random.seed(0) filt = SO3ProductParticleFilter(n_particles=3, num_rotations=1)