Skip to content

Commit

Permalink
Add a SingleTargetMixtureTracker (#825)
Browse files Browse the repository at this point in the history
This class helps to deal with the single target tracking using a probability data associator like PDA because, currently, using SingleTargetTracker would have ended in an error. This component is built following the MultiTargetMixtureTracker structure.
  • Loading branch information
A-acuto committed Aug 9, 2023
1 parent ce4114a commit 7ab2187
Show file tree
Hide file tree
Showing 2 changed files with 121 additions and 1 deletion.
102 changes: 102 additions & 0 deletions stonesoup/tracker/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,108 @@ def __next__(self):
return time, self.tracks


class SingleTargetMixtureTracker(Tracker):
""" A simple single target tracking that receives associations from a
(Gaussian) Mixture associator.
Track single objects using Stone Soup components. The tracker works by
first calling the :attr:`data_associator` with the active track, and then
either updating the track state with the result of the
:attr:`data_associator` that reduces the (Gaussian) Mixture of all
possible track-detection associations, or with the prediction if no
detection is associated to the track.
The track is then checked for deletion
by the :attr:`deleter`, and remaining unassociated detections are passed
to the :attr:`initiator` to generate new track.
Parameters
----------
"""
initiator: Initiator = Property(doc="Initiator used to initialise the track.")
deleter: Deleter = Property(doc="Initiator used to initialise the track.")
detector: DetectionReader = Property(doc="Detector used to generate detection objects.")
data_associator: DataAssociator = Property(
doc="Association algorithm to pair predictions to detections")
updater: Updater = Property(doc="Updater used to update the track object to the new state.")

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._track = None

@property
def tracks(self):
return {self._track} if self._track else set()

def __iter__(self):
self.detector_iter = iter(self.detector)
return super().__iter__()

def __next__(self):
time, detections = next(self.detector_iter)

if self._track is not None:
associations = self.data_associator.associate(
self.tracks, detections, time)

unassociated_detections = set(detections)
for track, multihypothesis in associations.items():

# calculate the Track's state as a Gaussian Mixture of
# its possible associations with each detection, then
# reduce the Mixture to a single Gaussian State
posterior_states = []
posterior_state_weights = []
for hypothesis in multihypothesis:
if not hypothesis:
posterior_states.append(hypothesis.prediction)
else:
posterior_states.append(
self.updater.update(hypothesis))
posterior_state_weights.append(
hypothesis.probability)

means = StateVectors([state.state_vector for state in posterior_states])
covars = np.stack([state.covar for state in posterior_states], axis=2)
weights = np.asarray(posterior_state_weights)

# Recuce the mixture of states to one posterior estimate Gaussian
post_mean, post_covar = gm_reduce_single(means, covars, weights)

missed_detection_weight = next(hyp.weight for hyp in multihypothesis if not hyp)

# Check if at least one reasonable measurement...
if any(hypothesis.weight > missed_detection_weight
for hypothesis in multihypothesis):
# ...and if so use update type
track.append(GaussianStateUpdate(
post_mean, post_covar,
multihypothesis,
multihypothesis[0].measurement.timestamp))
else:
# ...and if not, treat as a prediction
track.append(GaussianStatePrediction(
post_mean, post_covar,
multihypothesis[0].prediction.timestamp))

# any detections in multihypothesis that had an
# association score (weight) lower than or equal to the
# association score of "MissedDetection" is considered
# unassociated - candidate for initiating a new Track
for hyp in multihypothesis:
if hyp.weight > missed_detection_weight:
if hyp.measurement in unassociated_detections:
unassociated_detections.remove(hyp.measurement)

if self._track is None or self.deleter.delete_tracks(self.tracks):
new_tracks = self.initiator.initiate(detections, time)
if new_tracks:
self._track = new_tracks.pop()
else:
self._track = None

return time, self.tracks


class MultiTargetTracker(Tracker):
"""A simple multi target tracker.
Expand Down
20 changes: 19 additions & 1 deletion stonesoup/tracker/tests/test_simple.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import datetime
from ..simple import SingleTargetTracker, MultiTargetTracker, \
MultiTargetMixtureTracker
MultiTargetMixtureTracker, SingleTargetMixtureTracker


def test_single_target_tracker(
Expand All @@ -22,6 +22,24 @@ def test_single_target_tracker(
assert len(total_tracks) >= 2 # Should of had at least 2 over all steps


def test_single_target_mixture_tracker(
initiator, deleter, detector, data_mixture_associator, updater):
tracker = SingleTargetMixtureTracker(
initiator, deleter, detector, data_mixture_associator, updater)

previous_time = datetime.datetime(2018, 1, 1, 13, 59)
total_tracks = set()
for time, tracks in tracker:
assert time == previous_time + datetime.timedelta(minutes=1)
assert len(tracks) <= 1 # Shouldn't have more than one track
for track in tracks:
assert len(track.states) <= 10 # Deleter should delete them
total_tracks |= tracks

previous_time = time
assert len(total_tracks) >= 2


def test_multi_target_tracker(
initiator, deleter, detector, data_associator, updater):
tracker = MultiTargetTracker(
Expand Down

0 comments on commit 7ab2187

Please sign in to comment.