In [1]:
# %load_ext autoreload
# %autoreload 2

In [2]:
from multiprocessing import Pool
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np

from generator import generate_tracks_paper, generate_tracks_time_varying, generate_tracks_sudden_birth
from runners import generate_tests, run_test_from_config
from test_case import TestUseCase2D

from gmphd_fusion.data import StateVector, CovarianceMatrix
from gmphd_fusion.gm import Gaussian, GaussianMixture
from gmphd_fusion.measurement_model import LinearCoordinateMeasurementModel
from gmphd_fusion.motion_models import ConstantVelocityMotionModel

In [3]:
plt.style.use("bmh")
plt.rcParams.update({"figure.figsize": (24, 12),
                     "axes.facecolor": "white",
                     "axes.edgecolor": "black"})

# Define testing use cases

In [4]:
N_RUNS = 100
EXPERIMENTS_DIR = Path("../experiments")

# Change of parameters that we are going to measure
TEST_PARAMS = dict(
    clutter_rate=[5. * (i + 1) for i in range(10)],
    detection_prob=[0.7 + 0.05 * i for i in range(7)],
)

TEST_CASES = dict(
    two_obj_cross=TestUseCase2D(
        surveillance_region=((-1000., -1000.), (1000., 1000.)),
        clutter_rate=50,
        detection_prob=0.98,
        survival_prob=0.99,
        prune_threshold=1e-5,
        merge_threshold=4,
        max_components=1000,
        init_gm=GaussianMixture(),
        birth_gm=GaussianMixture(
            gaussians=[
                Gaussian(
                    mean=StateVector([250, 250, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 25, 25])),
                    label=Gaussian.BIRTH_LABEL),
                Gaussian(
                    mean=StateVector([-250, -250, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 25, 25])),
                    label=Gaussian.BIRTH_LABEL),
            ],
            weights=[0.1, 0.1],
        ),
        motion_model=ConstantVelocityMotionModel(motion_noise=5),
        measurement_model=LinearCoordinateMeasurementModel(dim_measurement=2, dim_state=4, measurement_noise=10),
        tracks_true=generate_tracks_paper(),
        cpep_radius=20,
        target_weight_threshold=0.5,
    ),
    birth_death_vary=TestUseCase2D(
        surveillance_region=((-1000., -1000.), (1000., 1000.)),
        clutter_rate=50,
        detection_prob=0.98,
        survival_prob=0.99,
        prune_threshold=1e-5,
        merge_threshold=4,
        max_components=1000,
        init_gm=GaussianMixture(),
        birth_gm=GaussianMixture(
            gaussians=[
                Gaussian(
                    mean=StateVector([-1000, 750, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 30, 30])),
                    label=Gaussian.BIRTH_LABEL),
                Gaussian(
                    mean=StateVector([1000, -750, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 30, 30])),
                    label=Gaussian.BIRTH_LABEL),
            ],
            weights=[0.1, 0.1],
        ),
        motion_model=ConstantVelocityMotionModel(motion_noise=5),
        measurement_model=LinearCoordinateMeasurementModel(dim_measurement=2, dim_state=4, measurement_noise=10),
        tracks_true=generate_tracks_time_varying(),
        cpep_radius=20,
        target_weight_threshold=0.5,
    ),
    birth_no_fusion=TestUseCase2D(
        surveillance_region=((-1000., -1000.), (1000., 1000.)),
        clutter_rate=50,
        detection_prob=0.98,
        survival_prob=0.99,
        prune_threshold=1e-5,
        merge_threshold=4,
        max_components=1000,
        init_gm=GaussianMixture(),
        birth_gm=GaussianMixture(
            gaussians=[
                Gaussian(
                    mean=StateVector([-1000, 750, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 30, 30])),
                    label=Gaussian.BIRTH_LABEL),
                Gaussian(
                    mean=StateVector([1000, -750, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 30, 30])),
                    label=Gaussian.BIRTH_LABEL),
            ],
            weights=[0.1, 0.1],
        ),
        motion_model=ConstantVelocityMotionModel(motion_noise=5),
        measurement_model=LinearCoordinateMeasurementModel(dim_measurement=2, dim_state=4, measurement_noise=10),
        tracks_true=generate_tracks_sudden_birth(),
        cpep_radius=20,
        target_weight_threshold=0.5,
    ),
    birth_fusion=TestUseCase2D(
        surveillance_region=((-1000., -1000.), (1000., 1000.)),
        clutter_rate=50,
        detection_prob=0.98,
        survival_prob=0.99,
        prune_threshold=1e-5,
        merge_threshold=4,
        max_components=1000,
        init_gm=GaussianMixture(),
        birth_gm=GaussianMixture(
            gaussians=[
                Gaussian(
                    mean=StateVector([-1000, 750, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 30, 30])),
                    label=Gaussian.BIRTH_LABEL),
                Gaussian(
                    mean=StateVector([1000, -750, 0, 0]),
                    cov=CovarianceMatrix(np.diag([100, 100, 30, 30])),
                    label=Gaussian.BIRTH_LABEL),
            ],
            weights=[0.1, 0.1],
        ),
        motion_model=ConstantVelocityMotionModel(motion_noise=5),
        measurement_model=LinearCoordinateMeasurementModel(dim_measurement=2, dim_state=4, measurement_noise=10),
        tracks_true=generate_tracks_sudden_birth(),
        fuse={
            20: GaussianMixture(
                gaussians=[
                    Gaussian(StateVector([-1050, -1050, 25, 25]), cov=CovarianceMatrix(np.diag([100, 100, 10, 10])),
                             label=None)],
                weights=[0.6],
            ),
            60: GaussianMixture(
                gaussians=[Gaussian(StateVector([1020, 1020, -22, -22]), cov=CovarianceMatrix(np.diag([40, 40, 5, 5])),
                                    label=None)],
                weights=[0.7],
            ),
        },
        cpep_radius=20,
        target_weight_threshold=0.5,
    ),
)

# Run

In [5]:
run_configurations = generate_tests(N_RUNS, TEST_CASES, TEST_PARAMS, EXPERIMENTS_DIR)

In [6]:
%%time
pool = Pool(8)
success_flags = pool.map(run_test_from_config, run_configurations)
# success_flags

CPU times: user 34.8 ms, sys: 69.3 ms, total: 104 ms
Wall time: 8.84 s


[True, True, True, True, True, True, True, True]

In [7]:
# from runners import run_test
#
# for i, config in enumerate(run_configurations[:8]):
#     print(i)
#     args, kwargs = config
#     run_test(*args, **kwargs)