Skip to content

Commit

Permalink
Merge pull request #868 from timothy-glover/create_kld_reward_and_mea…
Browse files Browse the repository at this point in the history
…sure

Introduce Kullback-Leibler Divegence measure and reward function
  • Loading branch information
sdhiscocks committed Nov 1, 2023
2 parents e6c15fa + f9d032a commit 444f8e3
Show file tree
Hide file tree
Showing 5 changed files with 546 additions and 123 deletions.
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ dev =
pytest-flake8
pytest-cov
pytest-remotedata
pytest-skip-slow
Sphinx
sphinx_rtd_theme>=1.2
sphinx-gallery>=0.10.1
Expand Down
63 changes: 62 additions & 1 deletion stonesoup/measures.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from scipy.spatial import distance

from .base import Base, Property
from .types.state import State
from .types.state import State, ParticleState


class Measure(Base):
Expand Down Expand Up @@ -401,3 +401,64 @@ def __call__(self, state1, state2):
mins = [min(s1, s2) for s1, s2 in zip(s1, s2)]
maxs = [max(s1, s2) for s1, s2 in zip(s1, s2)]
return np.sum(mins)/np.sum(maxs)


class KLDivergence(Measure):
r"""Kullback-Leibler divergence between two distributions
Kullback-Leibler divergence, also referred to as relative entropy, is a
statistical distance. It describes how a probability distribution is
different from another. The expression for Kullback-Leibler divergence
is given by [1]_
.. math::
D_{KL}(P\Vert Q) = \sum_x P(x)\log \frac{P(x)}{Q(x)},
where :math:`P(x)` is the first distribution, or ``state1`` and :math:`Q(x)`
is the second distribution or, ``state2``. It is worth noting that Kullback-Leibler
divergence is not symmetric under interchange of :math:`P(x)` and :math:`Q(x)`. The
implementation here uses natural log meaning the returned divergence has units in nats.
This implementation assumes a discrete probability space and currently only accepts
:class:`~.ParticleState`.
References
----------
.. [1] MacKay, David J. C. 2003. Information Theory, Inference and Learning
Algorithms, 1st Ed. Cambridge University Press, """

def __call__(self, state1, state2):
r"""
Computes the Kullback–Leibler divergence from ``state1`` to ``state2``
Parameters
----------
state1 : :class:`~.ParticleState`
state2 : :class:`~.ParticleState`
Returns
-------
float
Kullback–Leibler divergence from ``state1`` to ``state2``
"""
if isinstance(state1, ParticleState) and isinstance(state2, ParticleState):
if len(state1.particles) == len(state2.particles):

log_term = np.zeros(state1.log_weight.shape)

invalid_indx = (np.isinf(state1.log_weight) | np.isnan(state1.log_weight)
| np.isinf(state2.log_weight) | np.isnan(state2.log_weight))

# Do not consider NANs and inf in the subtraction
log_term[~invalid_indx] = state1.log_weight[~invalid_indx] \
- state2.log_weight[~invalid_indx]

kld = np.sum(np.exp(state1.log_weight)*log_term)
else:
raise ValueError(f'The input sizes are not compatible '
f'({len(state1.particles)} != {len(state2.particles)})')
else:
raise NotImplementedError('This measure is currently only compatible with '
'ParticleState types')

return kld
194 changes: 192 additions & 2 deletions stonesoup/sensormanager/reward.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@

import numpy as np

from ..measures import KLDivergence
from ..platform import Platform
from ..sensor.actionable import Actionable
from ..types.detection import TrueDetection
from ..base import Base, Property
from ..predictor.particle import ParticlePredictor
from ..predictor.kalman import KalmanPredictor
from ..updater.kalman import ExtendedKalmanUpdater
from ..types.track import Track
from ..types.hypothesis import SingleHypothesis
from ..sensor.sensor import Sensor
from ..sensor.action import Action
from ..types.prediction import Prediction
from ..updater.particle import ParticleUpdater
from ..resampler.particle import SystematicResampler
from ..types.state import State


class RewardFunction(Base, ABC):
Expand Down Expand Up @@ -85,15 +93,17 @@ def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: Set[Track]
# Reward value
config_metric = 0

predicted_sensors = list()
predicted_sensors = set()
memo = {}
# For each sensor in the configuration
for sensor, actions in config.items():
predicted_sensor = copy.deepcopy(sensor, memo)
predicted_sensor.add_actions(actions)
predicted_sensor.act(metric_time)
if isinstance(sensor, Sensor):
predicted_sensors.append(predicted_sensor) # checks if its a sensor
predicted_sensors.add(predicted_sensor) # checks if its a sensor
elif isinstance(sensor, Platform):
predicted_sensors.update(predicted_sensor.sensors)

# Create dictionary of predictions for the tracks in the configuration
predicted_tracks = set()
Expand Down Expand Up @@ -133,3 +143,183 @@ def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: Set[Track]

# Return value of configuration metric
return config_metric


class ExpectedKLDivergence(RewardFunction):
"""A reward function that implements the Kullback-Leibler divergence
for quantifying relative information gain between actions taken by
a sensor or group of sensors.
From a configuration of sensors and actions, an expected measurement is
generated based on the predicted distribution and an action being taken.
An update is generated based on this measurement. The Kullback-Leibler
divergence is then calculated between the predicted and updated target
distribution that resulted from the measurement. A larger divergence
between these distributions equates to more information gained from
the action and resulting measurement from that action.
"""

predictor: ParticlePredictor = Property(default=None,
doc="Predictor used to predict the track to a "
"new state. This reward function is only "
"compatible with :class:`~.ParticlePredictor` "
"types.")
updater: ParticleUpdater = Property(default=None,
doc="Updater used to update the track to the new state. "
"This reward function is only compatible with "
":class:`~.ParticleUpdater` types.")
method_sum: bool = Property(default=True, doc="Determines method of calculating reward."
"Default calculates sum across all targets."
"Otherwise calculates mean of all targets.")

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.KLD = KLDivergence()
if self.predictor is not None and not isinstance(self.predictor, ParticlePredictor):
raise NotImplementedError('Only ParticlePredictor types are currently compatible '
'with this reward function')
if self.updater is not None and not isinstance(self.updater, ParticleUpdater):
raise NotImplementedError('Only ParticleUpdater types are currently compatible '
'with this reward function')

def __call__(self, config: Mapping[Sensor, Sequence[Action]], tracks: Set[Track],
metric_time: datetime.datetime, *args, **kwargs):
"""
For a given configuration of sensors and actions this reward function
calculates the expected Kullback-Leibler divergence of each track. It is
calculated between the prediction and the posterior assuming an expected update
based on a predicted measurement.
This requires a mapping of sensors to action(s) to be evaluated by the
reward function, a set of tracks at given time and the time at which
the actions would be carried out until.
The metric returned is the total expected Kullback-Leibler
divergence across all tracks.
Returns
-------
: float
Kullback-Leibler divergence for given configuration
"""

# Reward value
kld = 0.

memo = {}
predicted_sensors = set()
# For each actionable in the configuration
for actionable, actions in config.items():
# Don't currently have an Actionable base for platforms hence either Platform or Sensor
if isinstance(actionable, Platform) or isinstance(actionable, Actionable):
predicted_actionable = copy.deepcopy(actionable, memo)
predicted_actionable.add_actions(actions)
predicted_actionable.act(metric_time)
if isinstance(actionable, Sensor):
predicted_sensors.add(predicted_actionable) # checks if its a sensor
elif isinstance(actionable, Platform):
predicted_sensors.update(predicted_actionable.sensors)

# Create dictionary of predictions for the tracks in the configuration
predicted_tracks = set()
for track in tracks:
predicted_track = Track()
if self.predictor:
predicted_track = copy.copy(track)
predicted_track.append(self.predictor.predict(track[-1],
timestamp=metric_time))
else:
predicted_track.append(Prediction.from_state(track[-1],
timestamp=metric_time))

predicted_tracks.add(predicted_track)

for sensor in predicted_sensors:
# Assumes one detection per track

detections = self._generate_detections(predicted_tracks, sensor)

for predicted_track, detection_set in detections.items():

for n, detection in enumerate(detection_set):

# if detection:
# Generate hypothesis based on prediction/previous update and detection
hypothesis = SingleHypothesis(predicted_track, detection)

# Do the update based on this hypothesis and store covariance matrix
update = self.updater.update(hypothesis)

# else:
# update = copy.copy(predicted_track[-1])

kld += self.KLD(predicted_track[-1], update)

if self.method_sum is False and len(detections) != 0:

kld /= len(detections)

# Return value of configuration metric
return kld

def _generate_detections(self, predicted_tracks, sensor):

detections = {}
for predicted_track in predicted_tracks:
track_detections = set()
track_detections.update(sensor.measure({State(predicted_track.mean)}, noise=True))

detections.update({predicted_track: track_detections})

return detections


class MultiUpdateExpectedKLDivergence(ExpectedKLDivergence):
"""A reward function that implements the Kullback-Leibler divergence
for quantifying relative information gain between actions taken by
a sensor or group of sensors.
From a configuration of sensors and actions, multiple expected measurements per
track are generated based on the predicted distribution and an action being taken.
The measurements are generated by resampling the particle state down to a
subsample with length specified by the user. Updates are generated for each of
these measurements and the Kullback-Leibler divergence calculated for each
of them.
"""

updates_per_track: int = Property(default=2,
doc="Number of measurements to generate from each "
"track prediction. This should be > 1.")

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.KLD = KLDivergence()
if self.predictor is not None and not isinstance(self.predictor, ParticlePredictor):
raise NotImplementedError('Only ParticlePredictor types are currently compatible '
'with this reward function')
if self.updater is not None and not isinstance(self.updater, ParticleUpdater):
raise NotImplementedError('Only ParticleUpdater types are currently compatible '
'with this reward function')
if self.updates_per_track < 2:
raise ValueError(f'updates_per_track = {self.updates_per_track}. This reward '
f'function only accepts >= 2')

def _generate_detections(self, predicted_tracks, sensor):

detections = {}

resampler = SystematicResampler()

for predicted_track in predicted_tracks:

measurement_sources = resampler.resample(predicted_track[-1],
nparts=self.updates_per_track)

track_detections = set()
for state in measurement_sources.state_vector:
track_detections.update(sensor.measure({State(state)}, noise=True))

detections.update({predicted_track: track_detections})

return detections

0 comments on commit 444f8e3

Please sign in to comment.