-
Notifications
You must be signed in to change notification settings - Fork 124
/
EnsembleFilterExample.py
183 lines (140 loc) · 6.29 KB
/
EnsembleFilterExample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python
# coding: utf-8
"""
=======================
Ensemble Filter Example
=======================
"""
# %%
# The Ensemble Kalman Filter (EnKF) is a hybrid of the Kalman updating scheme and the Monte Carlo
# approach of the particle filter. The EnKF provides an alternative to the Kalman Filter (and
# extensions of) which is specifically designed for high-dimensional states.
#
# EnKF can be applied to both non-linear and non-Gaussian state-spaces due to being completely
# based on sampling.
#
# Multiple versions of EnKF are implemented in Stone Soup - all make use of the same prediction
# step, but implement different versions of the update step. Namely, the updaters are:
#
# - :class:`~.EnsembleUpdater`
# - :class:`~.LinearisedEnsembleUpdater`
# - :class:`~.RecursiveLinearisedEnsembleUpdater`
# - :class:`~.RecursiveUpdater`
#
# The :class:`~.EnsembleUpdater` is deliberately structured to resemble the Vanilla Kalman Filter,
# :meth:`update` first calls :meth:`predict_measurement` function which
# proceeds by calculating the predicted measurement, innovation covariance
# and measurement cross-covariance. Note however, these are not propagated
# explicitly, they are derived from the sample covariance of the ensemble itself.
#
# The :class:`~.LinearisedEnsembleUpdater` is an implementation of 'The Linearized EnKF Update'
# algorithm from "Ensemble Kalman Filter with Bayesian Recursive Update" by Kristen Michaelson,
# Andrey A. Popov and Renato Zanetti. Similar to the EnsembleUpdater, but uses a different form
# of Kalman gain. This alternative form of the EnKF calculates a separate kalman gain for each
# ensemble member. This alternative Kalman gain calculation involves linearization of the
# measurement. An additional step is introduced to perform inflation.
#
# The :class:`~.RecursiveLinearisedEnsembleUpdater` is an implementation of 'The Bayesian
# Recursive Update Linearized EnKF' algorithm from "Ensemble Kalman Filter with Bayesian
# Recursive Update" by Kristen Michaelson, Andrey A. Popov and Renato Zanetti. It is an
# extended version of the LinearisedEnsembleUpdater that recursively iterates over the
# update step for a given number of iterations. This recursive version is designed to
# minimise the error caused by linearisation.
#
# The :class:`~.RecursiveEnsembleUpdater` is an extended version of the
# :class:`~.EnsembleUpdater` which recursively iterates over the update step.
# %%
# Example using stonesoup
# -----------------------
# Package imports
# ^^^^^^^^^^^^^^^
import numpy as np
from datetime import datetime, timedelta
# %%
# Start time of simulation
# ^^^^^^^^^^^^^^^^^^^^^^^^
start_time = datetime.now()
np.random.seed(1991)
# %%
# Generate and plot ground truth
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \
ConstantVelocity
from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState
from stonesoup.plotter import Plotterly
transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.05),
ConstantVelocity(0.05)])
truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=start_time)])
for k in range(1, 21):
truth.append(GroundTruthState(
transition_model.function(truth[k - 1], noise=True, time_interval=timedelta(seconds=1)),
timestamp=start_time + timedelta(seconds=k)))
# %%
plotter = Plotterly()
plotter.plot_ground_truths(truth, [0, 2])
plotter.fig
# %%
# Generate and plot detections
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
from stonesoup.models.measurement.nonlinear import CartesianToBearingRange
sensor_x = 50 # Placing the sensor off-centre
sensor_y = 0
measurement_model = CartesianToBearingRange(
ndim_state=4,
mapping=(0, 2),
noise_covar=np.diag([np.radians(0.2), 1]), # Covariance matrix. 0.2 degree variance in
# bearing and 1 metre in range
translation_offset=np.array([[sensor_x], [sensor_y]]) # Offset measurements to location of
# sensor in cartesian.
)
# %%
from stonesoup.types.detection import Detection
measurements = []
for state in truth:
measurement = measurement_model.function(state, noise=True)
measurements.append(Detection(measurement, timestamp=state.timestamp,
measurement_model=measurement_model))
# %%
plotter.plot_measurements(measurements, [0, 2])
plotter.fig
# %%
# Set up predictor and updater
# ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
# In this EnKF example we must use the :class:`~.EnsemblePredictor`, and choose to use the
# standard :class:`~.EnsembleUpdater`. Note that we could instanciate any of the other (ensemble)
# updaters mentioned in this example, in place of the :class:`~.EnsembleUpdater`.
from stonesoup.predictor.ensemble import EnsemblePredictor
from stonesoup.updater.ensemble import EnsembleUpdater
predictor = EnsemblePredictor(transition_model)
updater = EnsembleUpdater(measurement_model)
# %%
# Prior state
# ^^^^^^^^^^^
# For the simulation we must provide a prior state. The Ensemble Filter in stonesoup requires
# this to be an :class:`~.EnsembleState`. We generate an :class:`~.EnsembleState` by calling
# :meth:`~.generate_ensemble`. The prior state stores the prior state vector - see below that
# for the :class:`~.EnsembleState`, the `state_vector` must be a :class:`~.StateVectors` object.
from stonesoup.types.state import EnsembleState
ensemble = EnsembleState.generate_ensemble(
mean=np.array([[0], [1], [0], [1]]),
covar=np.diag([1.5, 0.5, 1.5, 0.5]), num_vectors=100)
prior = EnsembleState(state_vector=ensemble, timestamp=start_time)
# %%
type(prior.state_vector)
# %%
# Run the EnKF
# ^^^^^^^^^^^^
# Here we run the Ensemble Kalman Filter and plot the results. By marking flag `particle=True`,
# we plot each member of the ensembles. As usual, the ellipses represent the uncertainty in the
# tracks.
from stonesoup.types.hypothesis import SingleHypothesis
from stonesoup.types.track import Track
track = Track()
for measurement in measurements:
prediction = predictor.predict(prior, timestamp=measurement.timestamp)
hypothesis = SingleHypothesis(prediction, measurement) # Group a prediction and measurement
post = updater.update(hypothesis)
track.append(post)
prior = track[-1]
plotter.plot_tracks(track, [0, 2], uncertainty=True, particle=True)
plotter.fig