-
Notifications
You must be signed in to change notification settings - Fork 124
/
MFA_example.py
244 lines (205 loc) · 9.63 KB
/
MFA_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
#!/usr/bin/env python
"""
Multi-Frame Assignment example
==============================
This notebook includes an example of the Multi-Frame Assignment (MFA) algorithm [#]_.
The multi-frame assignment algorithm maintains and prunes hypotheses over *N*-Scans. This enables
the global optimum assignment choice to be selected over *N* multiple frames/timesteps/scans.
"""
# %%
# Global parameters
# -----------------
import numpy as np
from scipy.stats import chi2
np.random.seed(2001) # Used to set the seed for the numpy random number generator
prob_detect = 0.9 # Prob. of detection
gate_level = 8 # Gate level
prob_gate = chi2.cdf(gate_level, 2) # Prob. of gating, computed from gate level for hypothesiser
v_bounds = np.array([[-5, 30], [-5, 30]]) # Surveillance area bounds
lambdaV = 5 # Mean number of clutter points over the entire surveillance area
lambda_ = lambdaV/(np.prod(v_bounds[:, 0] - v_bounds[:, 1])) # Clutter density per unit volume
slide_window = 3 # Slide window; used by MFA data associator
# %%
# Plotting functions
# ------------------
# First we'll need some custom plotting functions, used to show the different assignments
# present within the slide window.
def plot_covar(state, ax, color=None):
w, v = np.linalg.eig(
measurement_model.matrix() @ state.covar @ measurement_model.matrix().T)
max_ind = np.argmax(w)
min_ind = np.argmin(w)
orient = np.arctan2(v[1, max_ind], v[0, max_ind])
ellipse = Ellipse(xy=state.state_vector[(0, 2), 0],
width=2 * np.sqrt(w[max_ind]),
height=2 * np.sqrt(w[min_ind]),
angle=np.rad2deg(orient),
alpha=0.2,
color=color)
ax.add_artist(ellipse)
return ellipse
def plot_tracks(tracks, ax, slide_window=None):
artists = []
for plot_style, track in zip(('r-', 'b-'), tracks):
mini_tracks = []
hist_window = len(track) if (slide_window is None or slide_window > len(track)) else slide_window
for component in track.state.components:
child_tag = component.tag
parents = []
for j in range(1, hist_window):
parent = next(comp for comp in track.states[-(j+1)].components
if comp.tag == child_tag[:-j])
parents.append(parent)
parents.reverse()
parents.append(component)
mini_tracks.append(parents)
drawn_states = set()
for mini_track in mini_tracks:
# Avoid re-plotting drawn trajectory
states_to_plot = [state for state in mini_track if state not in drawn_states]
# Insert state before so line is drawn
if len(states_to_plot) < len(mini_track):
states_to_plot.insert(0, mini_track[-(len(states_to_plot)+1)])
artists.extend(ax.plot([state.state_vector[0, 0] for state in states_to_plot],
[state.state_vector[2, 0] for state in states_to_plot],
plot_style))
# Avoid re-plotting drawn error ellipses
for state in set(states_to_plot) - drawn_states:
artists.append(plot_covar(state, ax, plot_style[0]))
drawn_states.add(state)
return artists
# %%
# Models
# ------
# Create models for target motion :math:`[x \ \dot{x} \ y \ \dot{y}]` with near constant velocity
# and measurements :math:`[x \ y]`.
from stonesoup.models.transition.linear import (
CombinedLinearGaussianTransitionModel, ConstantVelocity)
from stonesoup.models.measurement.linear import LinearGaussian
transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.005),
ConstantVelocity(0.005)])
measurement_model = LinearGaussian(ndim_state=4, mapping=(0, 2),
noise_covar=np.array([[0.7**2, 0],
[0, 0.7**2]]))
# %%
# Simulate ground truth
# ---------------------
# Simulate two targets moving at near constant velocity, crossing each other.
import datetime
from ordered_set import OrderedSet
from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState
truths = OrderedSet()
start_time = datetime.datetime.now()
truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])
for k in range(1, 21):
truth.append(GroundTruthState(
transition_model.function(truth[k-1], noise=True, time_interval=datetime.timedelta(seconds=1)),
timestamp=start_time+datetime.timedelta(seconds=k)))
truths.add(truth)
truth = GroundTruthPath([GroundTruthState([0, 1, 20, -1], timestamp=start_time)])
for k in range(1, 21):
truth.append(GroundTruthState(
transition_model.function(truth[k-1], noise=True, time_interval=datetime.timedelta(seconds=1)),
timestamp=start_time+datetime.timedelta(seconds=k)))
_ = truths.add(truth)
# %%
# Simulate measurements
# ---------------------
# Simulate both measurements and clutter, using parameters and models.
from stonesoup.types.detection import TrueDetection
from stonesoup.types.detection import Clutter
scans = []
for k in range(20):
detections = OrderedSet()
for truth in truths:
# Generate actual detection from the state with a chance that no detection is received.
if np.random.rand() <= prob_detect:
measurement = measurement_model.function(truth[k], noise=True)
detections.add(TrueDetection(state_vector=measurement,
groundtruth_path=truth,
timestamp=truth[k].timestamp))
# Generate clutter at this time-step
truth_x = truth[k].state_vector[0]
truth_y = truth[k].state_vector[2]
for _ in range(np.random.poisson(lambdaV)):
x = np.random.uniform(*v_bounds[0, :])
y = np.random.uniform(*v_bounds[1, :])
detections.add(Clutter([[x], [y]], timestamp=truth[k].timestamp))
scans.append(detections)
# %%
# Tracking Components
# -------------------
# %%
# For predictor and updater a linear Kalman filter.
from stonesoup.predictor.kalman import KalmanPredictor
from stonesoup.updater.kalman import KalmanUpdater
predictor = KalmanPredictor(transition_model)
updater = KalmanUpdater(measurement_model)
# %%
# For Hypothesiser and Data Associator, the MFA components will be used, wrapping
# a PDA hypothesiser.
from stonesoup.dataassociator.mfa import MFADataAssociator
from stonesoup.hypothesiser.mfa import MFAHypothesiser
from stonesoup.hypothesiser.probability import PDAHypothesiser
hypothesiser = PDAHypothesiser(predictor, updater, lambda_, prob_gate=prob_gate, prob_detect=prob_detect)
hypothesiser = MFAHypothesiser(hypothesiser)
data_associator = MFADataAssociator(hypothesiser, slide_window=slide_window)
# %%
# Estimate
# --------
# Initiate priors and tracks. With MFA, a Gaussian mixture is used, where the component
# represents the estimate as updated using the detections (or missed detection) assignments
# as recorded on the components tag.
from stonesoup.types.state import TaggedWeightedGaussianState
from stonesoup.types.track import Track
from stonesoup.types.mixture import GaussianMixture
from stonesoup.types.numeric import Probability
prior1 = GaussianMixture([TaggedWeightedGaussianState([[0], [1], [0], [1]],
np.diag([1.5, 0.5, 1.5, 0.5]),
timestamp=start_time,
weight=Probability(1), tag=[])])
prior2 = GaussianMixture([TaggedWeightedGaussianState([[0], [1], [20], [-1]],
np.diag([1.5, 0.5, 1.5, 0.5]),
timestamp=start_time,
weight=Probability(1), tag=[])])
tracks = OrderedSet((Track([prior1]), Track([prior2])))
# %%
# And finally run through the scans, and plot the track and assignments
# within slide window at each frame/timestep.
from matplotlib import animation
from matplotlib.patches import Ellipse
import matplotlib
matplotlib.rcParams['animation.html'] = 'jshtml'
from stonesoup.plotter import Plotter
from stonesoup.types.update import GaussianMixtureUpdate
plotter = Plotter()
frames = []
for n, detections in enumerate(scans):
artists = []
timestamp = start_time + datetime.timedelta(seconds=n)
associations = data_associator.associate(tracks, detections, timestamp)
for track, hypotheses in associations.items():
components = []
for hypothesis in hypotheses:
if not hypothesis:
components.append(hypothesis.prediction)
else:
update = updater.update(hypothesis)
components.append(update)
track.append(GaussianMixtureUpdate(components=components, hypothesis=hypotheses))
plotter.ax.set_xlabel("$x$")
plotter.ax.set_ylabel("$y$")
plotter.ax.set_xlim(*v_bounds[0])
plotter.ax.set_ylim(*v_bounds[1])
artists.extend(plotter.plot_ground_truths([truth[:n+1] for truth in truths], [0, 2]))
artists.extend(plotter.plot_measurements(detections, [0, 2], measurement_model))
# Plot the resulting tracks.
artists.extend(plot_tracks(tracks, plotter.ax))
frames.append(artists)
animation.ArtistAnimation(plotter.fig, frames)
# %%
# References
# ----------
# .. [#] Xia, Y., Granström, K., Svensson, L., García-Fernández, Á.F., and Williams, J.L.,
# 2019. Multiscan Implementation of the Trajectory Poisson Multi-Bernoulli Mixture Filter.
# J. Adv. Information Fusion, 14(2), pp. 213–235.