In [26]:
from datetime import datetime
from datetime import timedelta
from ordered_set import OrderedSet
import numpy as np
from scipy.stats import uniform

from stonesoup.models.transition.linear import CombinedLinearGaussianTransitionModel, \
                                               ConstantVelocity
from stonesoup.types.groundtruth import GroundTruthPath, GroundTruthState
from stonesoup.types.detection import TrueDetection
from stonesoup.types.detection import Clutter
from stonesoup.models.measurement.linear import LinearGaussian

np.random.seed(1993)

truths = OrderedSet()
num_steps = 20

start_time = datetime.now().replace(microsecond=0)
transition_model = CombinedLinearGaussianTransitionModel([ConstantVelocity(0.01),
                                                          ConstantVelocity(0.01)])

timesteps = [start_time]
truth = GroundTruthPath([GroundTruthState([0, 1, 0, 1], timestamp=timesteps[0])])
for k in range(1, num_steps+1):
    timesteps.append(start_time + timedelta(seconds=k))
    truth.append(GroundTruthState(
        transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),
        timestamp=timesteps[k]))
truths.add(truth)

truth = GroundTruthPath([GroundTruthState([100, -1, 0, 1], timestamp=timesteps[0])])
for k in range(1, num_steps+1):
    truth.append(GroundTruthState(
        transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),
        timestamp=timesteps[k]))
truths.add(truth)

truth = GroundTruthPath([GroundTruthState([0, 1, 100, -1], timestamp=timesteps[0])])
for k in range(1, num_steps+1):
    truth.append(GroundTruthState(
        transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),
        timestamp=timesteps[k]))
truths.add(truth)

truth = GroundTruthPath([GroundTruthState([100, -1, 100, -1], timestamp=timesteps[0])])
for k in range(1, num_steps+1):
    truth.append(GroundTruthState(
        transition_model.function(truth[k-1], noise=True, time_interval=timedelta(seconds=1)),
        timestamp=timesteps[k]))
truths.add(truth)

# Plot ground truth.
from stonesoup.plotter import Plotterly
plotter = Plotterly()
plotter.plot_ground_truths(truths, [0, 2])

# Generate measurements.
all_measurements = []

measurement_model = LinearGaussian(
    ndim_state=4,
    mapping=(0, 2),
    noise_covar=np.array([[0.75, 0],
                          [0, 0.75]])
    )

prob_detect = 0.9  # 90% chance of detection.

for k in range(num_steps):
    measurement_set = set()

    for truth in truths:
        # Generate actual detection from the state with a 10% chance that no detection is received.
        if np.random.rand() <= prob_detect:
            measurement = measurement_model.function(truth[k], noise=True)
            measurement_set.add(TrueDetection(state_vector=measurement,
                                              groundtruth_path=truth,
                                              timestamp=truth[k].timestamp,
                                              measurement_model=measurement_model))

        # Generate clutter at this time-step
        truth_x = truth[k].state_vector[0]
        truth_y = truth[k].state_vector[2]
        for _ in range(np.random.randint(10)):
            x = uniform.rvs(truth_x - 10, 20)
            y = uniform.rvs(truth_y - 10, 20)
            measurement_set.add(Clutter(np.array([[x], [y]]), timestamp=truth[k].timestamp,
                                        measurement_model=measurement_model))
    all_measurements.append(measurement_set)

# Plot true detections and clutter.
plotter.plot_measurements(all_measurements, [0, 2])
#plotter.fig

In [27]:
from stonesoup.predictor.kalman import KalmanPredictor
from stonesoup.updater.kalman import KalmanUpdater
from stonesoup.hypothesiser.probability import PDAHypothesiser
from stonesoup.dataassociator.probability import JPDA

predictor = KalmanPredictor(transition_model)
updater = KalmanUpdater(measurement_model)
hypothesiser = PDAHypothesiser(predictor, updater, clutter_spatial_density = 0.125, prob_detect = prob_detect)
data_associator = JPDA(hypothesiser = hypothesiser)

In [28]:
from stonesoup.types.state import GaussianState
from stonesoup.types.track import Track
from stonesoup.types.array import StateVectors
from stonesoup.functions import gm_reduce_single
from stonesoup.types.update import GaussianStateUpdate

prior1 = GaussianState([[0], [1], [0], [1]], np.diag([1.5, 0.5, 1.5, 0.5]), timestamp = start_time)
prior2 = GaussianState([[0], [1], [100], [-1]], np.diag([1.5, 0.5, 1.5,0.5]), timestamp = start_time)
prior3 = GaussianState([[100], [-1], [0], [1]], np.diag([1.5, 0.5, 1.5,0.5]), timestamp =start_time)
prior4 = GaussianState([[100], [-1], [100], [-1]], np.diag([1.5, 0.5, 1.5,0.5]), timestamp =start_time)

tracks = {Track([prior1]), Track([prior2]), Track([prior3]), Track([prior4])}

for n, measurements in enumerate(all_measurements):
    #print(tracks)
    print(measurements)
    hypotheses = data_associator.associate(tracks, measurements, start_time+timedelta(seconds = n))
    print(f"hypotheses: {hypotheses}")
    for track in tracks:
        track_hypotheses = hypotheses[track]
        print(f"track: {track}")
        posterior_states = []
        posterior_state_weights = []

        for hypothesis in track_hypotheses:
            #print(hypothesis)
            if not hypothesis:
                posterior_states.append(hypothesis.prediction)
            else:
                posterior_state = updater.update(hypothesis)
                posterior_states.append(posterior_state)
            posterior_state_weights.append(hypothesis.probability)
        
        means = StateVectors([state.state_vector for state in posterior_states])
        covars = np.stack([state.covar for state in posterior_states], axis = 2)
        weights = np.asarray(posterior_state_weights)

        post_mean, post_covar = gm_reduce_single(means, covars, weights)

        track.append(GaussianStateUpdate(post_mean, post_covar, track_hypotheses, track_hypotheses[0].measurement.timestamp))

{Clutter(
    state_vector=StateVector([[7.46701535],
                              [0.46082479]]),
    timestamp=2023-09-15 10:40:56,
    measurement_model=LinearGaussian(
                          ndim_state=4,
                          mapping=(0, 2),
                          noise_covar=CovarianceMatrix([[0.75, 0.  ],
                                                        [0.  , 0.75]]),
                          seed=None),
    metadata={}), Clutter(
    state_vector=StateVector([[-1.52452393],
                              [-0.32184504]]),
    timestamp=2023-09-15 10:40:56,
    measurement_model=LinearGaussian(
                          ndim_state=4,
                          mapping=(0, 2),
                          noise_covar=CovarianceMatrix([[0.75, 0.  ],
                                                        [0.  , 0.75]]),
                          seed=None),
    metadata={}), Clutter(
    state_vector=StateVector([[ 5.8571374 ],
                              [-2.51364

KeyboardInterrupt: 

In [None]:
plotter.plot_tracks(tracks, [0,2], uncertainty = True)
plotter.fig